Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,16 @@ def sort_return_tuples(response, **options):
return list(zip(*[response[i::n] for i in range(n)]))


def parse_stream_list(response):
def parse_stream_list(response, **options):
if response is None:
return None
data = []
for r in response:
if r is not None:
data.append((r[0], pairs_to_dict(r[1])))
if "claim_min_idle_time" in options:
data.append((r[0], pairs_to_dict(r[1]), *r[2:]))
else:
data.append((r[0], pairs_to_dict(r[1])))
else:
data.append((None, None))
return data
Expand Down Expand Up @@ -332,16 +335,18 @@ def parse_xinfo_stream(response, **options):
return data


def parse_xread(response):
def parse_xread(response, **options):
if response is None:
return []
return [[r[0], parse_stream_list(r[1])] for r in response]
return [[r[0], parse_stream_list(r[1], **options)] for r in response]


def parse_xread_resp3(response):
def parse_xread_resp3(response, **options):
if response is None:
return {}
return {key: [parse_stream_list(value)] for key, value in response.items()}
return {
key: [parse_stream_list(value, **options)] for key, value in response.items()
}


def parse_xpending(response, **options):
Expand Down
15 changes: 14 additions & 1 deletion redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4101,6 +4101,7 @@ def xreadgroup(
count: Optional[int] = None,
block: Optional[int] = None,
noack: bool = False,
claim_min_idle_time: Optional[int] = None,
) -> ResponseT:
"""
Read from a stream via a consumer group.
Expand All @@ -4118,8 +4119,12 @@ def xreadgroup(
block: number of milliseconds to wait, if nothing already present.
noack: do not add messages to the PEL

claim_min_idle_time: accepts an integer type and represents a
time interval in milliseconds

For more information, see https://redis.io/commands/xreadgroup
"""
options = {}
pieces: list[EncodableT] = [b"GROUP", groupname, consumername]
if count is not None:
if not isinstance(count, int) or count < 1:
Expand All @@ -4133,12 +4138,20 @@ def xreadgroup(
pieces.append(str(block))
if noack:
pieces.append(b"NOACK")
if claim_min_idle_time is not None:
if not isinstance(claim_min_idle_time, int) or claim_min_idle_time < 0:
raise DataError(
"XREADGROUP claim_min_idle_time must be a non-negative integer"
)
pieces.append(b"CLAIM")
pieces.append(claim_min_idle_time)
options["claim_min_idle_time"] = claim_min_idle_time
if not isinstance(streams, dict) or len(streams) == 0:
raise DataError("XREADGROUP streams must be a non empty dict")
pieces.append(b"STREAMS")
pieces.extend(streams.keys())
pieces.extend(streams.values())
return self.execute_command("XREADGROUP", *pieces)
return self.execute_command("XREADGROUP", *pieces, **options)

def xrevrange(
self,
Expand Down
Loading