Skip to content

Commit bb7ae74

Browse files
authored
Adding CLAIM option to XREADGROUP command + unit tests (#3825)
1 parent 489932d commit bb7ae74

File tree

4 files changed

+697
-9
lines changed

4 files changed

+697
-9
lines changed

redis/_parsers/helpers.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,13 +268,16 @@ def sort_return_tuples(response, **options):
268268
return list(zip(*[response[i::n] for i in range(n)]))
269269

270270

271-
def parse_stream_list(response):
271+
def parse_stream_list(response, **options):
272272
if response is None:
273273
return None
274274
data = []
275275
for r in response:
276276
if r is not None:
277-
data.append((r[0], pairs_to_dict(r[1])))
277+
if "claim_min_idle_time" in options:
278+
data.append((r[0], pairs_to_dict(r[1]), *r[2:]))
279+
else:
280+
data.append((r[0], pairs_to_dict(r[1])))
278281
else:
279282
data.append((None, None))
280283
return data
@@ -332,16 +335,18 @@ def parse_xinfo_stream(response, **options):
332335
return data
333336

334337

335-
def parse_xread(response):
338+
def parse_xread(response, **options):
336339
if response is None:
337340
return []
338-
return [[r[0], parse_stream_list(r[1])] for r in response]
341+
return [[r[0], parse_stream_list(r[1], **options)] for r in response]
339342

340343

341-
def parse_xread_resp3(response):
344+
def parse_xread_resp3(response, **options):
342345
if response is None:
343346
return {}
344-
return {key: [parse_stream_list(value)] for key, value in response.items()}
347+
return {
348+
key: [parse_stream_list(value, **options)] for key, value in response.items()
349+
}
345350

346351

347352
def parse_xpending(response, **options):

redis/commands/core.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4101,6 +4101,7 @@ def xreadgroup(
41014101
count: Optional[int] = None,
41024102
block: Optional[int] = None,
41034103
noack: bool = False,
4104+
claim_min_idle_time: Optional[int] = None,
41044105
) -> ResponseT:
41054106
"""
41064107
Read from a stream via a consumer group.
@@ -4118,8 +4119,12 @@ def xreadgroup(
41184119
block: number of milliseconds to wait, if nothing already present.
41194120
noack: do not add messages to the PEL
41204121
4122+
claim_min_idle_time: accepts an integer type and represents a
4123+
time interval in milliseconds
4124+
41214125
For more information, see https://redis.io/commands/xreadgroup
41224126
"""
4127+
options = {}
41234128
pieces: list[EncodableT] = [b"GROUP", groupname, consumername]
41244129
if count is not None:
41254130
if not isinstance(count, int) or count < 1:
@@ -4133,12 +4138,20 @@ def xreadgroup(
41334138
pieces.append(str(block))
41344139
if noack:
41354140
pieces.append(b"NOACK")
4141+
if claim_min_idle_time is not None:
4142+
if not isinstance(claim_min_idle_time, int) or claim_min_idle_time < 0:
4143+
raise DataError(
4144+
"XREADGROUP claim_min_idle_time must be a non-negative integer"
4145+
)
4146+
pieces.append(b"CLAIM")
4147+
pieces.append(claim_min_idle_time)
4148+
options["claim_min_idle_time"] = claim_min_idle_time
41364149
if not isinstance(streams, dict) or len(streams) == 0:
41374150
raise DataError("XREADGROUP streams must be a non empty dict")
41384151
pieces.append(b"STREAMS")
41394152
pieces.extend(streams.keys())
41404153
pieces.extend(streams.values())
4141-
return self.execute_command("XREADGROUP", *pieces)
4154+
return self.execute_command("XREADGROUP", *pieces, **options)
41424155

41434156
def xrevrange(
41444157
self,

0 commit comments

Comments
 (0)