Skip to content

Commit 5a0a06d

Browse files
authored
feat(xreadgroup): add claim attribute (#3122)
* feat(xreadgroup): add claim attribute the CLAIM attribute can be used to instruct redis to return PEL ( Pending Entries List ) entries with their respective deliveries and ms since last delivery * remove m01 from test matrix * add jsdoc
1 parent 130e88d commit 5a0a06d

File tree

4 files changed

+106
-49
lines changed

4 files changed

+106
-49
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
fail-fast: false
2323
matrix:
2424
node-version: ["18", "20", "22"]
25-
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-M01-pre", "8.4-RC1-pre"]
25+
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-RC1-pre"]
2626
steps:
2727
- uses: actions/checkout@v4
2828
with:

packages/client/lib/commands/XREADGROUP.spec.ts

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,33 @@ describe('XREADGROUP', () => {
9393
['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'STREAMS', 'key', '0-0']
9494
);
9595
});
96+
97+
it('with CLAIM', () => {
98+
assert.deepEqual(
99+
parseArgs(XREADGROUP, 'group', 'consumer', {
100+
key: 'key',
101+
id: '0-0'
102+
}, {
103+
CLAIM: 100
104+
}),
105+
['XREADGROUP', 'GROUP', 'group', 'consumer', 'CLAIM', '100', 'STREAMS', 'key', '0-0']
106+
);
107+
});
108+
109+
it('with COUNT, BLOCK, NOACK, CLAIM', () => {
110+
assert.deepEqual(
111+
parseArgs(XREADGROUP, 'group', 'consumer', {
112+
key: 'key',
113+
id: '0-0'
114+
}, {
115+
COUNT: 1,
116+
BLOCK: 0,
117+
NOACK: true,
118+
CLAIM: 100
119+
}),
120+
['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'CLAIM', '100', 'STREAMS', 'key', '0-0']
121+
);
122+
});
96123
});
97124

98125
testUtils.testAll('xReadGroup - null', async client => {
@@ -156,35 +183,54 @@ describe('XREADGROUP', () => {
156183
cluster: GLOBAL.CLUSTERS.OPEN
157184
});
158185

159-
testUtils.testWithClient('client.xReadGroup should throw with resp3 and unstableResp3: false', async client => {
160-
assert.throws(
161-
() => client.xReadGroup('group', 'consumer', {
162-
key: 'key',
163-
id: '>'
186+
testUtils.testAll('xReadGroup - without CLAIM should not include delivery fields', async client => {
187+
const [, id] = await Promise.all([
188+
client.xGroupCreate('key', 'group', '$', {
189+
MKSTREAM: true
164190
}),
165-
{
166-
message: 'Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance'
167-
}
168-
);
169-
}, {
170-
...GLOBAL.SERVERS.OPEN,
171-
clientOptions: {
172-
RESP: 3
173-
}
174-
});
191+
client.xAdd('key', '*', { field: 'value' })
192+
]);
175193

176-
testUtils.testWithClient('client.xReadGroup should not throw with resp3 and unstableResp3: true', async client => {
177-
assert.doesNotThrow(
178-
() => client.xReadGroup('group', 'consumer', {
179-
key: 'key',
180-
id: '>'
181-
})
182-
);
194+
const readGroupReply = await client.xReadGroup('group', 'consumer', {
195+
key: 'key',
196+
id: '>'
197+
});
198+
199+
assert.ok(readGroupReply);
200+
assert.equal(readGroupReply[0].messages[0].millisElapsedFromDelivery, undefined);
201+
assert.equal(readGroupReply[0].messages[0].deliveriesCounter, undefined);
183202
}, {
184-
...GLOBAL.SERVERS.OPEN,
185-
clientOptions: {
186-
RESP: 3,
187-
unstableResp3: true
188-
}
203+
client: GLOBAL.SERVERS.OPEN,
204+
cluster: GLOBAL.CLUSTERS.OPEN
189205
});
206+
207+
testUtils.testWithClientIfVersionWithinRange([[8,4], 'LATEST'],'xReadGroup - with CLAIM should include delivery fields', async client => {
208+
const [, id] = await Promise.all([
209+
client.xGroupCreate('key', 'group', '$', {
210+
MKSTREAM: true
211+
}),
212+
client.xAdd('key', '*', { field: 'value' })
213+
]);
214+
215+
// First read to add message to PEL
216+
await client.xReadGroup('group', 'consumer', {
217+
key: 'key',
218+
id: '>'
219+
});
220+
221+
// Read with CLAIM to get delivery fields
222+
const readGroupReply = await client.xReadGroup('group', 'consumer2', {
223+
key: 'key',
224+
id: '>'
225+
}, {
226+
CLAIM: 0
227+
});
228+
229+
assert.ok(readGroupReply);
230+
assert.equal(readGroupReply[0].messages[0].id, id);
231+
assert.ok(readGroupReply[0].messages[0].millisElapsedFromDelivery !== undefined);
232+
assert.ok(readGroupReply[0].messages[0].deliveriesCounter !== undefined);
233+
assert.equal(typeof readGroupReply[0].messages[0].millisElapsedFromDelivery, 'number');
234+
assert.equal(typeof readGroupReply[0].messages[0].deliveriesCounter, 'number');
235+
}, GLOBAL.SERVERS.OPEN);
190236
});

packages/client/lib/commands/XREADGROUP.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@ import { transformStreamsMessagesReplyResp2 } from './generic-transformers';
55

66
/**
77
* Options for the XREADGROUP command
8-
*
8+
*
99
* @property COUNT - Limit the number of entries returned per stream
1010
* @property BLOCK - Milliseconds to block waiting for new entries (0 for indefinite)
1111
* @property NOACK - Skip adding the message to the PEL (Pending Entries List)
12+
* @property CLAIM - Prepend PEL entries that are at least this many milliseconds old
1213
*/
1314
export interface XReadGroupOptions {
1415
COUNT?: number;
1516
BLOCK?: number;
1617
NOACK?: boolean;
18+
CLAIM?: number;
1719
}
1820

1921
export default {
@@ -50,6 +52,10 @@ export default {
5052
parser.push('NOACK');
5153
}
5254

55+
if (options?.CLAIM !== undefined) {
56+
parser.push('CLAIM', options.CLAIM.toString());
57+
}
58+
5359
pushXReadStreams(parser, streams);
5460
},
5561
/**
@@ -59,5 +65,4 @@ export default {
5965
2: transformStreamsMessagesReplyResp2,
6066
3: undefined as unknown as () => ReplyUnion
6167
},
62-
unstableResp3: true,
6368
} as const satisfies Command;

packages/client/lib/commands/generic-transformers.ts

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export function transformStringDoubleArgument(num: RedisArgument | number): Redi
4646
export const transformDoubleReply = {
4747
2: (reply: BlobStringReply, preserve?: any, typeMapping?: TypeMapping): DoubleReply => {
4848
const double = typeMapping ? typeMapping[RESP_TYPES.DOUBLE] : undefined;
49-
49+
5050
switch (double) {
5151
case String: {
5252
return reply as unknown as DoubleReply;
@@ -58,13 +58,13 @@ export const transformDoubleReply = {
5858
case 'inf':
5959
case '+inf':
6060
ret = Infinity;
61-
61+
6262
case '-inf':
6363
ret = -Infinity;
64-
64+
6565
case 'nan':
6666
ret = NaN;
67-
67+
6868
default:
6969
ret = Number(reply);
7070
}
@@ -98,7 +98,7 @@ export function createTransformNullableDoubleReplyResp2Func(preserve?: any, type
9898
export const transformNullableDoubleReply = {
9999
2: (reply: BlobStringReply | NullReply, preserve?: any, typeMapping?: TypeMapping) => {
100100
if (reply === null) return null;
101-
101+
102102
return transformDoubleReply[2](reply as BlobStringReply, preserve, typeMapping);
103103
},
104104
3: undefined as unknown as () => DoubleReply | NullReply
@@ -514,19 +514,25 @@ export function parseArgs(command: Command, ...args: Array<any>): CommandArgumen
514514

515515
export type StreamMessageRawReply = TuplesReply<[
516516
id: BlobStringReply,
517-
message: ArrayReply<BlobStringReply>
517+
message: ArrayReply<BlobStringReply>,
518+
millisElapsedFromDelivery?: NumberReply,
519+
deliveriesCounter?: NumberReply
518520
]>;
519521

520522
export type StreamMessageReply = {
521523
id: BlobStringReply,
522524
message: MapReply<BlobStringReply | string, BlobStringReply>,
525+
millisElapsedFromDelivery?: number
526+
deliveriesCounter?: number
523527
};
524528

525529
export function transformStreamMessageReply(typeMapping: TypeMapping | undefined, reply: StreamMessageRawReply): StreamMessageReply {
526-
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
530+
const [ id, message, millisElapsedFromDelivery, deliveriesCounter ] = reply as unknown as UnwrapReply<typeof reply>;
527531
return {
528532
id: id,
529-
message: transformTuplesReply(message, undefined, typeMapping)
533+
message: transformTuplesReply(message, undefined, typeMapping),
534+
...(millisElapsedFromDelivery !== undefined ? { millisElapsedFromDelivery: Number(millisElapsedFromDelivery) } : {}),
535+
...(deliveriesCounter !== undefined ? { deliveriesCounter: Number(deliveriesCounter) } : {})
530536
};
531537
}
532538

@@ -557,7 +563,7 @@ export function transformStreamsMessagesReplyResp2(
557563
reply: UnwrapReply<StreamsMessagesRawReply2 | NullReply>,
558564
preserve?: any,
559565
typeMapping?: TypeMapping
560-
): StreamsMessagesReply | NullReply {
566+
): StreamsMessagesReply | NullReply {
561567
// FUTURE: resposne type if resp3 was working, reverting to old v4 for now
562568
//: MapReply<BlobStringReply | string, StreamMessagesReply> | NullReply {
563569
if (reply === null) return null as unknown as NullReply;
@@ -569,25 +575,25 @@ export function transformStreamsMessagesReplyResp2(
569575
570576
for (let i=0; i < reply.length; i++) {
571577
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
572-
578+
573579
const name = stream[0];
574580
const rawMessages = stream[1];
575-
581+
576582
ret.set(name.toString(), transformStreamMessagesReply(rawMessages, typeMapping));
577583
}
578-
584+
579585
return ret as unknown as MapReply<string, StreamMessagesReply>;
580586
}
581587
case Array: {
582588
const ret: Array<BlobStringReply | StreamMessagesReply> = [];
583589
584590
for (let i=0; i < reply.length; i++) {
585591
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
586-
592+
587593
const name = stream[0];
588594
const rawMessages = stream[1];
589-
590-
ret.push(name);
595+
596+
ret.push(name);
591597
ret.push(transformStreamMessagesReply(rawMessages, typeMapping));
592598
}
593599
@@ -598,13 +604,13 @@ export function transformStreamsMessagesReplyResp2(
598604
599605
for (let i=0; i < reply.length; i++) {
600606
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
601-
607+
602608
const name = stream[0] as unknown as UnwrapReply<BlobStringReply>;
603609
const rawMessages = stream[1];
604-
610+
605611
ret[name.toString()] = transformStreamMessagesReply(rawMessages);
606612
}
607-
613+
608614
return ret as unknown as MapReply<string, StreamMessagesReply>;
609615
}
610616
*/
@@ -630,7 +636,7 @@ type StreamsMessagesRawReply3 = MapReply<BlobStringReply, ArrayReply<StreamMessa
630636

631637
export function transformStreamsMessagesReplyResp3(reply: UnwrapReply<StreamsMessagesRawReply3 | NullReply>): MapReply<BlobStringReply, StreamMessagesReply> | NullReply {
632638
if (reply === null) return null as unknown as NullReply;
633-
639+
634640
if (reply instanceof Map) {
635641
const ret = new Map<string, StreamMessagesReply>();
636642

0 commit comments

Comments
 (0)