Skip to content

Add new stream commands #3711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3484,6 +3484,28 @@ def xack(self, name: KeyT, groupname: GroupT, *ids: StreamIdT) -> ResponseT:
"""
return self.execute_command("XACK", name, groupname, *ids)

def xackdel(
self,
name: KeyT,
groupname: GroupT,
*ids: StreamIdT,
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
) -> ResponseT:
"""
Combines the functionality of XACK and XDEL. Acknowledges the specified
message IDs in the given consumer group and simultaneously attempts to
delete the corresponding entries from the stream.
"""
if not ids:
raise DataError("XACKDEL requires at least one message ID")

if ref_policy not in ("KEEPREF", "DELREF", "ACKED"):
raise DataError("XACKDEL ref_policy must be one of: KEEPREF, DELREF, ACKED")

pieces = [name, groupname, ref_policy, "IDS", len(ids)]
pieces.extend(ids)
return self.execute_command("XACKDEL", *pieces)

def xadd(
self,
name: KeyT,
Expand All @@ -3494,6 +3516,7 @@ def xadd(
nomkstream: bool = False,
minid: Union[StreamIdT, None] = None,
limit: Optional[int] = None,
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
) -> ResponseT:
"""
Add to a stream.
Expand All @@ -3507,13 +3530,22 @@ def xadd(
minid: the minimum id in the stream to query.
Can't be specified with maxlen.
limit: specifies the maximum number of entries to retrieve
ref_policy: optional reference policy for consumer groups when trimming:
- KEEPREF (default): When trimming, preserves references in consumer groups' PEL
- DELREF: When trimming, removes all references from consumer groups' PEL
- ACKED: When trimming, only removes entries acknowledged by all consumer groups

For more information see https://redis.io/commands/xadd
"""
pieces: list[EncodableT] = []
if maxlen is not None and minid is not None:
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")

if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"):
raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED")

if nomkstream:
pieces.append(b"NOMKSTREAM")
if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 0:
raise DataError("XADD maxlen must be non-negative integer")
Expand All @@ -3528,8 +3560,8 @@ def xadd(
pieces.append(minid)
if limit is not None:
pieces.extend([b"LIMIT", limit])
if nomkstream:
pieces.append(b"NOMKSTREAM")
if ref_policy is not None:
pieces.append(ref_policy)
pieces.append(id)
if not isinstance(fields, dict) or len(fields) == 0:
raise DataError("XADD fields must be a non-empty dict")
Expand Down Expand Up @@ -3683,6 +3715,26 @@ def xdel(self, name: KeyT, *ids: StreamIdT) -> ResponseT:
"""
return self.execute_command("XDEL", name, *ids)

def xdelex(
self,
name: KeyT,
*ids: StreamIdT,
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
) -> ResponseT:
"""
Extended version of XDEL that provides more control over how message entries
are deleted concerning consumer groups.
"""
if not ids:
raise DataError("XDELEX requires at least one message ID")

if ref_policy not in ("KEEPREF", "DELREF", "ACKED"):
raise DataError("XDELEX ref_policy must be one of: KEEPREF, DELREF, ACKED")

pieces = [name, ref_policy, "IDS", len(ids)]
pieces.extend(ids)
return self.execute_command("XDELEX", *pieces)

def xgroup_create(
self,
name: KeyT,
Expand Down Expand Up @@ -4034,6 +4086,7 @@ def xtrim(
approximate: bool = True,
minid: Union[StreamIdT, None] = None,
limit: Optional[int] = None,
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
) -> ResponseT:
"""
Trims old messages from a stream.
Expand All @@ -4044,6 +4097,10 @@ def xtrim(
minid: the minimum id in the stream to query
Can't be specified with maxlen.
limit: specifies the maximum number of entries to retrieve
ref_policy: optional reference policy for consumer groups:
- KEEPREF (default): Trims entries but preserves references in consumer groups' PEL
- DELREF: Trims entries and removes all references from consumer groups' PEL
- ACKED: Only trims entries that were read and acknowledged by all consumer groups

For more information see https://redis.io/commands/xtrim
"""
Expand All @@ -4054,6 +4111,9 @@ def xtrim(
if maxlen is None and minid is None:
raise DataError("One of ``maxlen`` or ``minid`` must be specified")

if ref_policy is not None and ref_policy not in ("KEEPREF", "DELREF", "ACKED"):
raise DataError("XTRIM ref_policy must be one of: KEEPREF, DELREF, ACKED")

if maxlen is not None:
pieces.append(b"MAXLEN")
if minid is not None:
Expand All @@ -4067,6 +4127,8 @@ def xtrim(
if limit is not None:
pieces.append(b"LIMIT")
pieces.append(limit)
if ref_policy is not None:
pieces.append(ref_policy)

return self.execute_command("XTRIM", name, *pieces)

Expand Down
150 changes: 150 additions & 0 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis):
# 1 message is trimmed
assert await r.xtrim(stream, 3, approximate=False) == 1

@skip_if_server_version_lt("8.1.224")
async def test_xdelex(self, r: redis.Redis):
stream = "stream"

m1 = await r.xadd(stream, {"foo": "bar"})
m2 = await r.xadd(stream, {"foo": "bar"})
m3 = await r.xadd(stream, {"foo": "bar"})
m4 = await r.xadd(stream, {"foo": "bar"})

# Test XDELEX with default ref_policy (KEEPREF)
result = await r.xdelex(stream, m1)
assert result == [1]

# Test XDELEX with explicit KEEPREF
result = await r.xdelex(stream, m2, ref_policy="KEEPREF")
assert result == [1]

# Test XDELEX with DELREF
result = await r.xdelex(stream, m3, ref_policy="DELREF")
assert result == [1]

# Test XDELEX with ACKED
result = await r.xdelex(stream, m4, ref_policy="ACKED")
assert result == [1]

# Test with non-existent ID
result = await r.xdelex(stream, "999999-0", ref_policy="KEEPREF")
assert result == [-1]

# Test with multiple IDs
m5 = await r.xadd(stream, {"foo": "bar"})
m6 = await r.xadd(stream, {"foo": "bar"})
result = await r.xdelex(stream, m5, m6, ref_policy="KEEPREF")
assert result == [1, 1]

# Test error cases
with pytest.raises(redis.DataError):
await r.xdelex(stream, "123-0", ref_policy="INVALID")

with pytest.raises(redis.DataError):
await r.xdelex(stream) # No IDs provided

@skip_if_server_version_lt("8.1.224")
async def test_xackdel(self, r: redis.Redis):
stream = "stream"
group = "group"
consumer = "consumer"

m1 = await r.xadd(stream, {"foo": "bar"})
m2 = await r.xadd(stream, {"foo": "bar"})
m3 = await r.xadd(stream, {"foo": "bar"})
m4 = await r.xadd(stream, {"foo": "bar"})
await r.xgroup_create(stream, group, 0)

await r.xreadgroup(group, consumer, streams={stream: ">"})

# Test XACKDEL with default ref_policy (KEEPREF)
result = await r.xackdel(stream, group, m1)
assert result == [1]

# Test XACKDEL with explicit KEEPREF
result = await r.xackdel(stream, group, m2, ref_policy="KEEPREF")
assert result == [1]

# Test XACKDEL with DELREF
result = await r.xackdel(stream, group, m3, ref_policy="DELREF")
assert result == [1]

# Test XACKDEL with ACKED
result = await r.xackdel(stream, group, m4, ref_policy="ACKED")
assert result == [1]

# Test with non-existent ID
result = await r.xackdel(stream, group, "999999-0", ref_policy="KEEPREF")
assert result == [-1]

# Test error cases
with pytest.raises(redis.DataError):
await r.xackdel(stream, group, m1, ref_policy="INVALID")

with pytest.raises(redis.DataError):
await r.xackdel(stream, group) # No IDs provided

@skip_if_server_version_lt("8.1.224")
async def test_xtrim_with_options(self, r: redis.Redis):
stream = "stream"

await r.xadd(stream, {"foo": "bar"})
await r.xadd(stream, {"foo": "bar"})
await r.xadd(stream, {"foo": "bar"})
await r.xadd(stream, {"foo": "bar"})

# Test XTRIM with KEEPREF ref_policy
assert (
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="KEEPREF")
== 2
)

await r.xadd(stream, {"foo": "bar"})
await r.xadd(stream, {"foo": "bar"})

# Test XTRIM with DELREF ref_policy
assert (
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="DELREF") == 2
)

await r.xadd(stream, {"foo": "bar"})
await r.xadd(stream, {"foo": "bar"})

# Test XTRIM with ACKED ref_policy
assert (
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="ACKED") == 2
)

# Test error case
with pytest.raises(redis.DataError):
await r.xtrim(stream, maxlen=2, ref_policy="INVALID")

@skip_if_server_version_lt("8.1.224")
async def test_xadd_with_options(self, r: redis.Redis):
stream = "stream"

# Test XADD with KEEPREF ref_policy
await r.xadd(
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
)
await r.xadd(
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
)
await r.xadd(
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
)
assert await r.xlen(stream) == 2

# Test XADD with DELREF ref_policy
await r.xadd(
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF"
)
assert await r.xlen(stream) == 2

# Test XADD with ACKED ref_policy
await r.xadd(
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED"
)
assert await r.xlen(stream) == 2

# Test error case
with pytest.raises(redis.DataError):
await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")

@pytest.mark.onlynoncluster
async def test_bitfield_operations(self, r: redis.Redis):
# comments show affected bits
Expand Down
Loading