From db57f938fca22cf8a221e54a9eac7dda9dcf1f11 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 16 Jul 2025 19:15:32 +0300 Subject: [PATCH 1/2] Add new stream commands --- redis/commands/core.py | 66 +++++++++++- tests/test_asyncio/test_commands.py | 150 ++++++++++++++++++++++++++++ tests/test_commands.py | 139 ++++++++++++++++++++++++++ 3 files changed, 353 insertions(+), 2 deletions(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index fa448aaac3..5ebc1ac8f6 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3484,6 +3484,28 @@ def xack(self, name: KeyT, groupname: GroupT, *ids: StreamIdT) -> ResponseT: """ return self.execute_command("XACK", name, groupname, *ids) + def xackdel( + self, + name: KeyT, + groupname: GroupT, + *ids: StreamIdT, + ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF", + ) -> ResponseT: + """ + Combines the functionality of XACK and XDEL. Acknowledges the specified + message IDs in the given consumer group and simultaneously attempts to + delete the corresponding entries from the stream. + """ + if not ids: + raise DataError("XACKDEL requires at least one message ID") + + if ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + raise DataError("XACKDEL ref_policy must be one of: KEEPREF, DELREF, ACKED") + + pieces = [name, groupname, ref_policy, "IDS", len(ids)] + pieces.extend(ids) + return self.execute_command("XACKDEL", *pieces) + def xadd( self, name: KeyT, @@ -3494,6 +3516,7 @@ def xadd( nomkstream: bool = False, minid: Union[StreamIdT, None] = None, limit: Optional[int] = None, + ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None, ) -> ResponseT: """ Add to a stream. @@ -3507,6 +3530,10 @@ def xadd( minid: the minimum id in the stream to query. Can't be specified with maxlen. limit: specifies the maximum number of entries to retrieve + ref_policy: optional reference policy for consumer groups when trimming: + - KEEPREF (default): When trimming, preserves references in consumer groups' PEL + - DELREF: When trimming, removes all references from consumer groups' PEL + - ACKED: When trimming, only removes entries acknowledged by all consumer groups For more information see https://redis.io/commands/xadd """ @@ -3514,6 +3541,11 @@ def xadd( if maxlen is not None and minid is not None: raise DataError("Only one of ```maxlen``` or ```minid``` may be specified") + if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED") + + if nomkstream: + pieces.append(b"NOMKSTREAM") if maxlen is not None: if not isinstance(maxlen, int) or maxlen < 0: raise DataError("XADD maxlen must be non-negative integer") @@ -3528,8 +3560,8 @@ def xadd( pieces.append(minid) if limit is not None: pieces.extend([b"LIMIT", limit]) - if nomkstream: - pieces.append(b"NOMKSTREAM") + if ref_policy is not None: + pieces.append(ref_policy) pieces.append(id) if not isinstance(fields, dict) or len(fields) == 0: raise DataError("XADD fields must be a non-empty dict") @@ -3683,6 +3715,26 @@ def xdel(self, name: KeyT, *ids: StreamIdT) -> ResponseT: """ return self.execute_command("XDEL", name, *ids) + def xdelex( + self, + name: KeyT, + *ids: StreamIdT, + ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF", + ) -> ResponseT: + """ + Extended version of XDEL that provides more control over how message entries + are deleted concerning consumer groups. + """ + if not ids: + raise DataError("XDELEX requires at least one message ID") + + if ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + raise DataError("XDELEX ref_policy must be one of: KEEPREF, DELREF, ACKED") + + pieces = [name, ref_policy, "IDS", len(ids)] + pieces.extend(ids) + return self.execute_command("XDELEX", *pieces) + def xgroup_create( self, name: KeyT, @@ -4034,6 +4086,7 @@ def xtrim( approximate: bool = True, minid: Union[StreamIdT, None] = None, limit: Optional[int] = None, + ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None, ) -> ResponseT: """ Trims old messages from a stream. @@ -4044,6 +4097,10 @@ def xtrim( minid: the minimum id in the stream to query Can't be specified with maxlen. limit: specifies the maximum number of entries to retrieve + ref_policy: optional reference policy for consumer groups: + - KEEPREF (default): Trims entries but preserves references in consumer groups' PEL + - DELREF: Trims entries and removes all references from consumer groups' PEL + - ACKED: Only trims entries that were read and acknowledged by all consumer groups For more information see https://redis.io/commands/xtrim """ @@ -4054,6 +4111,9 @@ def xtrim( if maxlen is None and minid is None: raise DataError("One of ``maxlen`` or ``minid`` must be specified") + if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + raise DataError("XTRIM ref_policy must be one of: KEEPREF, DELREF, ACKED") + if maxlen is not None: pieces.append(b"MAXLEN") if minid is not None: @@ -4067,6 +4127,8 @@ def xtrim( if limit is not None: pieces.append(b"LIMIT") pieces.append(limit) + if ref_policy is not None: + pieces.append(ref_policy) return self.execute_command("XTRIM", name, *pieces) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index bf65210f31..9db7d200e6 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis): # 1 message is trimmed assert await r.xtrim(stream, 3, approximate=False) == 1 + @skip_if_server_version_lt("8.1.224") + async def test_xdelex(self, r: redis.Redis): + stream = "stream" + + m1 = await r.xadd(stream, {"foo": "bar"}) + m2 = await r.xadd(stream, {"foo": "bar"}) + m3 = await r.xadd(stream, {"foo": "bar"}) + m4 = await r.xadd(stream, {"foo": "bar"}) + + # Test XDELEX with default ref_policy (KEEPREF) + result = await r.xdelex(stream, m1) + assert result == [1] + + # Test XDELEX with explicit KEEPREF + result = await r.xdelex(stream, m2, ref_policy="KEEPREF") + assert result == [1] + + # Test XDELEX with DELREF + result = await r.xdelex(stream, m3, ref_policy="DELREF") + assert result == [1] + + # Test XDELEX with ACKED + result = await r.xdelex(stream, m4, ref_policy="ACKED") + assert result == [1] + + # Test with non-existent ID + result = await r.xdelex(stream, "999999-0", ref_policy="KEEPREF") + assert result == [-1] + + # Test with multiple IDs + m5 = await r.xadd(stream, {"foo": "bar"}) + m6 = await r.xadd(stream, {"foo": "bar"}) + result = await r.xdelex(stream, m5, m6, ref_policy="KEEPREF") + assert result == [1, 1] + + # Test error cases + with pytest.raises(redis.DataError): + await r.xdelex(stream, "123-0", ref_policy="INVALID") + + with pytest.raises(redis.DataError): + await r.xdelex(stream) # No IDs provided + + @skip_if_server_version_lt("8.1.224") + async def test_xackdel(self, r: redis.Redis): + stream = "stream" + group = "group" + consumer = "consumer" + + m1 = await r.xadd(stream, {"foo": "bar"}) + m2 = await r.xadd(stream, {"foo": "bar"}) + m3 = await r.xadd(stream, {"foo": "bar"}) + m4 = await r.xadd(stream, {"foo": "bar"}) + await r.xgroup_create(stream, group, 0) + + await r.xreadgroup(group, consumer, streams={stream: ">"}) + + # Test XACKDEL with default ref_policy (KEEPREF) + result = await r.xackdel(stream, group, m1) + assert result == [1] + + # Test XACKDEL with explicit KEEPREF + result = await r.xackdel(stream, group, m2, ref_policy="KEEPREF") + assert result == [1] + + # Test XACKDEL with DELREF + result = await r.xackdel(stream, group, m3, ref_policy="DELREF") + assert result == [1] + + # Test XACKDEL with ACKED + result = await r.xackdel(stream, group, m4, ref_policy="ACKED") + assert result == [1] + + # Test with non-existent ID + result = await r.xackdel(stream, group, "999999-0", ref_policy="KEEPREF") + assert result == [-1] + + # Test error cases + with pytest.raises(redis.DataError): + await r.xackdel(stream, group, m1, ref_policy="INVALID") + + with pytest.raises(redis.DataError): + await r.xackdel(stream, group) # No IDs provided + + @skip_if_server_version_lt("8.1.224") + async def test_xtrim_with_options(self, r: redis.Redis): + stream = "stream" + + await r.xadd(stream, {"foo": "bar"}) + await r.xadd(stream, {"foo": "bar"}) + await r.xadd(stream, {"foo": "bar"}) + await r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with KEEPREF ref_policy + assert ( + await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="KEEPREF") + == 2 + ) + + await r.xadd(stream, {"foo": "bar"}) + await r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with DELREF ref_policy + assert ( + await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="DELREF") == 2 + ) + + await r.xadd(stream, {"foo": "bar"}) + await r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with ACKED ref_policy + assert ( + await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="ACKED") == 2 + ) + + # Test error case + with pytest.raises(redis.DataError): + await r.xtrim(stream, maxlen=2, ref_policy="INVALID") + + @skip_if_server_version_lt("8.1.224") + async def test_xadd_with_options(self, r: redis.Redis): + stream = "stream" + + # Test XADD with KEEPREF ref_policy + await r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + await r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + await r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + assert await r.xlen(stream) == 2 + + # Test XADD with DELREF ref_policy + await r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF" + ) + assert await r.xlen(stream) == 2 + + # Test XADD with ACKED ref_policy + await r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED" + ) + assert await r.xlen(stream) == 2 + + # Test error case + with pytest.raises(redis.DataError): + await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID") + @pytest.mark.onlynoncluster async def test_bitfield_operations(self, r: redis.Redis): # comments show affected bits diff --git a/tests/test_commands.py b/tests/test_commands.py index 9ac8ee4933..42530a47d2 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -5096,6 +5096,145 @@ def test_xtrim_minlen_and_length_args(self, r): r.xadd(stream, {"foo": "bar"}) assert r.xtrim(stream, None, approximate=True, minid=m3) == 0 + @skip_if_server_version_lt("8.1.224") + def test_xdelex(self, r): + stream = "stream" + + m1 = r.xadd(stream, {"foo": "bar"}) + m2 = r.xadd(stream, {"foo": "bar"}) + m3 = r.xadd(stream, {"foo": "bar"}) + m4 = r.xadd(stream, {"foo": "bar"}) + + # Test XDELEX with default ref_policy (KEEPREF) + result = r.xdelex(stream, m1) + assert result == [1] + + # Test XDELEX with explicit KEEPREF + result = r.xdelex(stream, m2, ref_policy="KEEPREF") + assert result == [1] + + # Test XDELEX with DELREF + result = r.xdelex(stream, m3, ref_policy="DELREF") + assert result == [1] + + # Test XDELEX with ACKED + result = r.xdelex(stream, m4, ref_policy="ACKED") + assert result == [1] + + # Test with non-existent ID + result = r.xdelex(stream, "999999-0", ref_policy="KEEPREF") + assert result == [-1] + + # Test with multiple IDs + m5 = r.xadd(stream, {"foo": "bar"}) + m6 = r.xadd(stream, {"foo": "bar"}) + result = r.xdelex(stream, m5, m6, ref_policy="KEEPREF") + assert result == [1, 1] # Both entries deleted + + # Test error cases + with pytest.raises(redis.DataError): + r.xdelex(stream, "123-0", ref_policy="INVALID") + + with pytest.raises(redis.DataError): + r.xdelex(stream) # No IDs provided + + @skip_if_server_version_lt("8.1.224") + def test_xackdel(self, r): + stream = "stream" + group = "group" + consumer = "consumer" + + m1 = r.xadd(stream, {"foo": "bar"}) + m2 = r.xadd(stream, {"foo": "bar"}) + m3 = r.xadd(stream, {"foo": "bar"}) + m4 = r.xadd(stream, {"foo": "bar"}) + r.xgroup_create(stream, group, 0) + + r.xreadgroup(group, consumer, streams={stream: ">"}) + + # Test XACKDEL with default ref_policy (KEEPREF) + result = r.xackdel(stream, group, m1) + assert result == [1] + + # Test XACKDEL with explicit KEEPREF + result = r.xackdel(stream, group, m2, ref_policy="KEEPREF") + assert result == [1] + + # Test XACKDEL with DELREF + result = r.xackdel(stream, group, m3, ref_policy="DELREF") + assert result == [1] + + # Test XACKDEL with ACKED + result = r.xackdel(stream, group, m4, ref_policy="ACKED") + assert result == [1] + + # Test with non-existent ID + result = r.xackdel(stream, group, "999999-0", ref_policy="KEEPREF") + assert result == [-1] + + # Test error cases + with pytest.raises(redis.DataError): + r.xackdel(stream, group, m1, ref_policy="INVALID") + + with pytest.raises(redis.DataError): + r.xackdel(stream, group) # No IDs provided + + @skip_if_server_version_lt("8.1.224") + def test_xtrim_with_options(self, r): + stream = "stream" + + r.xadd(stream, {"foo": "bar"}) + r.xadd(stream, {"foo": "bar"}) + r.xadd(stream, {"foo": "bar"}) + r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with KEEPREF ref_policy + assert r.xtrim(stream, maxlen=2, approximate=False, ref_policy="KEEPREF") == 2 + + r.xadd(stream, {"foo": "bar"}) + r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with DELREF ref_policy + assert r.xtrim(stream, maxlen=2, approximate=False, ref_policy="DELREF") == 2 + + r.xadd(stream, {"foo": "bar"}) + r.xadd(stream, {"foo": "bar"}) + + # Test XTRIM with ACKED ref_policy + assert r.xtrim(stream, maxlen=2, approximate=False, ref_policy="ACKED") == 2 + + # Test error case + with pytest.raises(redis.DataError): + r.xtrim(stream, maxlen=2, ref_policy="INVALID") + + @skip_if_server_version_lt("8.1.224") + def test_xadd_with_options(self, r): + stream = "stream" + + # Test XADD with KEEPREF ref_policy + r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + r.xadd( + stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF" + ) + assert r.xlen(stream) == 2 + + # Test XADD with DELREF ref_policy + r.xadd(stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF") + assert r.xlen(stream) == 2 + + # Test XADD with ACKED ref_policy + r.xadd(stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED") + assert r.xlen(stream) == 2 + + # Test error case + with pytest.raises(redis.DataError): + r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID") + def test_bitfield_operations(self, r): # comments show affected bits bf = r.bitfield("a") From 477b9f914cbca2a7e0c602918e40087b1014b871 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Thu, 24 Jul 2025 17:16:20 +0300 Subject: [PATCH 2/2] optimization changes --- redis/commands/core.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index 5ebc1ac8f6..d6fb550724 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3499,7 +3499,7 @@ def xackdel( if not ids: raise DataError("XACKDEL requires at least one message ID") - if ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}: raise DataError("XACKDEL ref_policy must be one of: KEEPREF, DELREF, ACKED") pieces = [name, groupname, ref_policy, "IDS", len(ids)] @@ -3541,11 +3541,9 @@ def xadd( if maxlen is not None and minid is not None: raise DataError("Only one of ```maxlen``` or ```minid``` may be specified") - if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}: raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED") - if nomkstream: - pieces.append(b"NOMKSTREAM") if maxlen is not None: if not isinstance(maxlen, int) or maxlen < 0: raise DataError("XADD maxlen must be non-negative integer") @@ -3560,6 +3558,8 @@ def xadd( pieces.append(minid) if limit is not None: pieces.extend([b"LIMIT", limit]) + if nomkstream: + pieces.append(b"NOMKSTREAM") if ref_policy is not None: pieces.append(ref_policy) pieces.append(id) @@ -3728,7 +3728,7 @@ def xdelex( if not ids: raise DataError("XDELEX requires at least one message ID") - if ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}: raise DataError("XDELEX ref_policy must be one of: KEEPREF, DELREF, ACKED") pieces = [name, ref_policy, "IDS", len(ids)] @@ -4111,7 +4111,7 @@ def xtrim( if maxlen is None and minid is None: raise DataError("One of ``maxlen`` or ``minid`` must be specified") - if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"): + if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}: raise DataError("XTRIM ref_policy must be one of: KEEPREF, DELREF, ACKED") if maxlen is not None: