From 34a9aa20c1d06ca5f04b78e00862e63ab0d47616 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 30 Oct 2025 14:00:05 +0200 Subject: [PATCH 1/3] feat(xreadgroup): add claim attribute the CLAIM attribute can be used to instruct redis to return PEL ( Pending Entries List ) entries with their respective deliveries and ms since last delivery --- .../client/lib/commands/XREADGROUP.spec.ts | 100 +++++++++++++----- packages/client/lib/commands/XREADGROUP.ts | 8 +- .../lib/commands/generic-transformers.ts | 44 ++++---- 3 files changed, 104 insertions(+), 48 deletions(-) diff --git a/packages/client/lib/commands/XREADGROUP.spec.ts b/packages/client/lib/commands/XREADGROUP.spec.ts index acc7cc2dea..39c7c70d67 100644 --- a/packages/client/lib/commands/XREADGROUP.spec.ts +++ b/packages/client/lib/commands/XREADGROUP.spec.ts @@ -93,6 +93,33 @@ describe('XREADGROUP', () => { ['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'STREAMS', 'key', '0-0'] ); }); + + it('with CLAIM', () => { + assert.deepEqual( + parseArgs(XREADGROUP, 'group', 'consumer', { + key: 'key', + id: '0-0' + }, { + CLAIM: 100 + }), + ['XREADGROUP', 'GROUP', 'group', 'consumer', 'CLAIM', '100', 'STREAMS', 'key', '0-0'] + ); + }); + + it('with COUNT, BLOCK, NOACK, CLAIM', () => { + assert.deepEqual( + parseArgs(XREADGROUP, 'group', 'consumer', { + key: 'key', + id: '0-0' + }, { + COUNT: 1, + BLOCK: 0, + NOACK: true, + CLAIM: 100 + }), + ['XREADGROUP', 'GROUP', 'group', 'consumer', 'COUNT', '1', 'BLOCK', '0', 'NOACK', 'CLAIM', '100', 'STREAMS', 'key', '0-0'] + ); + }); }); testUtils.testAll('xReadGroup - null', async client => { @@ -156,35 +183,54 @@ describe('XREADGROUP', () => { cluster: GLOBAL.CLUSTERS.OPEN }); - testUtils.testWithClient('client.xReadGroup should throw with resp3 and unstableResp3: false', async client => { - assert.throws( - () => client.xReadGroup('group', 'consumer', { - key: 'key', - id: '>' + testUtils.testAll('xReadGroup - without CLAIM should not include delivery fields', async client => { + const [, id] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true }), - { - message: 'Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance' - } - ); - }, { - ...GLOBAL.SERVERS.OPEN, - clientOptions: { - RESP: 3 - } - }); + client.xAdd('key', '*', { field: 'value' }) + ]); - testUtils.testWithClient('client.xReadGroup should not throw with resp3 and unstableResp3: true', async client => { - assert.doesNotThrow( - () => client.xReadGroup('group', 'consumer', { - key: 'key', - id: '>' - }) - ); + const readGroupReply = await client.xReadGroup('group', 'consumer', { + key: 'key', + id: '>' + }); + + assert.ok(readGroupReply); + assert.equal(readGroupReply[0].messages[0].millisElapsedFromDelivery, undefined); + assert.equal(readGroupReply[0].messages[0].deliveriesCounter, undefined); }, { - ...GLOBAL.SERVERS.OPEN, - clientOptions: { - RESP: 3, - unstableResp3: true - } + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN }); + + testUtils.testWithClientIfVersionWithinRange([[8,4], 'LATEST'],'xReadGroup - with CLAIM should include delivery fields', async client => { + const [, id] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xAdd('key', '*', { field: 'value' }) + ]); + + // First read to add message to PEL + await client.xReadGroup('group', 'consumer', { + key: 'key', + id: '>' + }); + + // Read with CLAIM to get delivery fields + const readGroupReply = await client.xReadGroup('group', 'consumer2', { + key: 'key', + id: '>' + }, { + CLAIM: 0 + }); + + assert.ok(readGroupReply); + assert.equal(readGroupReply[0].messages[0].id, id); + assert.ok(readGroupReply[0].messages[0].millisElapsedFromDelivery !== undefined); + assert.ok(readGroupReply[0].messages[0].deliveriesCounter !== undefined); + assert.equal(typeof readGroupReply[0].messages[0].millisElapsedFromDelivery, 'number'); + assert.equal(typeof readGroupReply[0].messages[0].deliveriesCounter, 'number'); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XREADGROUP.ts b/packages/client/lib/commands/XREADGROUP.ts index b274aab95f..92ebd52927 100644 --- a/packages/client/lib/commands/XREADGROUP.ts +++ b/packages/client/lib/commands/XREADGROUP.ts @@ -5,7 +5,7 @@ import { transformStreamsMessagesReplyResp2 } from './generic-transformers'; /** * Options for the XREADGROUP command - * + * * @property COUNT - Limit the number of entries returned per stream * @property BLOCK - Milliseconds to block waiting for new entries (0 for indefinite) * @property NOACK - Skip adding the message to the PEL (Pending Entries List) @@ -14,6 +14,7 @@ export interface XReadGroupOptions { COUNT?: number; BLOCK?: number; NOACK?: boolean; + CLAIM?: number; } export default { @@ -50,6 +51,10 @@ export default { parser.push('NOACK'); } + if (options?.CLAIM !== undefined) { + parser.push('CLAIM', options.CLAIM.toString()); + } + pushXReadStreams(parser, streams); }, /** @@ -59,5 +64,4 @@ export default { 2: transformStreamsMessagesReplyResp2, 3: undefined as unknown as () => ReplyUnion }, - unstableResp3: true, } as const satisfies Command; diff --git a/packages/client/lib/commands/generic-transformers.ts b/packages/client/lib/commands/generic-transformers.ts index 022339e4bb..56e99c28de 100644 --- a/packages/client/lib/commands/generic-transformers.ts +++ b/packages/client/lib/commands/generic-transformers.ts @@ -46,7 +46,7 @@ export function transformStringDoubleArgument(num: RedisArgument | number): Redi export const transformDoubleReply = { 2: (reply: BlobStringReply, preserve?: any, typeMapping?: TypeMapping): DoubleReply => { const double = typeMapping ? typeMapping[RESP_TYPES.DOUBLE] : undefined; - + switch (double) { case String: { return reply as unknown as DoubleReply; @@ -58,13 +58,13 @@ export const transformDoubleReply = { case 'inf': case '+inf': ret = Infinity; - + case '-inf': ret = -Infinity; - + case 'nan': ret = NaN; - + default: ret = Number(reply); } @@ -98,7 +98,7 @@ export function createTransformNullableDoubleReplyResp2Func(preserve?: any, type export const transformNullableDoubleReply = { 2: (reply: BlobStringReply | NullReply, preserve?: any, typeMapping?: TypeMapping) => { if (reply === null) return null; - + return transformDoubleReply[2](reply as BlobStringReply, preserve, typeMapping); }, 3: undefined as unknown as () => DoubleReply | NullReply @@ -514,19 +514,25 @@ export function parseArgs(command: Command, ...args: Array): CommandArgumen export type StreamMessageRawReply = TuplesReply<[ id: BlobStringReply, - message: ArrayReply + message: ArrayReply, + millisElapsedFromDelivery?: NumberReply, + deliveriesCounter?: NumberReply ]>; export type StreamMessageReply = { id: BlobStringReply, message: MapReply, + millisElapsedFromDelivery?: number + deliveriesCounter?: number }; export function transformStreamMessageReply(typeMapping: TypeMapping | undefined, reply: StreamMessageRawReply): StreamMessageReply { - const [ id, message ] = reply as unknown as UnwrapReply; + const [ id, message, millisElapsedFromDelivery, deliveriesCounter ] = reply as unknown as UnwrapReply; return { id: id, - message: transformTuplesReply(message, undefined, typeMapping) + message: transformTuplesReply(message, undefined, typeMapping), + ...(millisElapsedFromDelivery !== undefined ? { millisElapsedFromDelivery: Number(millisElapsedFromDelivery) } : {}), + ...(deliveriesCounter !== undefined ? { deliveriesCounter: Number(deliveriesCounter) } : {}) }; } @@ -557,7 +563,7 @@ export function transformStreamsMessagesReplyResp2( reply: UnwrapReply, preserve?: any, typeMapping?: TypeMapping -): StreamsMessagesReply | NullReply { +): StreamsMessagesReply | NullReply { // FUTURE: resposne type if resp3 was working, reverting to old v4 for now //: MapReply | NullReply { if (reply === null) return null as unknown as NullReply; @@ -569,13 +575,13 @@ export function transformStreamsMessagesReplyResp2( for (let i=0; i < reply.length; i++) { const stream = reply[i] as unknown as UnwrapReply; - + const name = stream[0]; const rawMessages = stream[1]; - + ret.set(name.toString(), transformStreamMessagesReply(rawMessages, typeMapping)); } - + return ret as unknown as MapReply; } case Array: { @@ -583,11 +589,11 @@ export function transformStreamsMessagesReplyResp2( for (let i=0; i < reply.length; i++) { const stream = reply[i] as unknown as UnwrapReply; - + const name = stream[0]; const rawMessages = stream[1]; - - ret.push(name); + + ret.push(name); ret.push(transformStreamMessagesReply(rawMessages, typeMapping)); } @@ -598,13 +604,13 @@ export function transformStreamsMessagesReplyResp2( for (let i=0; i < reply.length; i++) { const stream = reply[i] as unknown as UnwrapReply; - + const name = stream[0] as unknown as UnwrapReply; const rawMessages = stream[1]; - + ret[name.toString()] = transformStreamMessagesReply(rawMessages); } - + return ret as unknown as MapReply; } */ @@ -630,7 +636,7 @@ type StreamsMessagesRawReply3 = MapReply): MapReply | NullReply { if (reply === null) return null as unknown as NullReply; - + if (reply instanceof Map) { const ret = new Map(); From c2412494b5ede39e10c5d44dac3dc7f953b59431 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 31 Oct 2025 15:14:27 +0200 Subject: [PATCH 2/3] remove m01 from test matrix --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2e3a91f2c8..bf16dd8dec 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -22,7 +22,7 @@ jobs: fail-fast: false matrix: node-version: ["18", "20", "22"] - redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-M01-pre", "8.4-RC1-pre"] + redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2", "8.4-RC1-pre"] steps: - uses: actions/checkout@v4 with: From 6e5d0fd009952dedbee49868fc9fc6bab09697af Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 3 Nov 2025 11:35:52 +0200 Subject: [PATCH 3/3] add jsdoc --- packages/client/lib/commands/XREADGROUP.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/client/lib/commands/XREADGROUP.ts b/packages/client/lib/commands/XREADGROUP.ts index 92ebd52927..d177c2e486 100644 --- a/packages/client/lib/commands/XREADGROUP.ts +++ b/packages/client/lib/commands/XREADGROUP.ts @@ -9,6 +9,7 @@ import { transformStreamsMessagesReplyResp2 } from './generic-transformers'; * @property COUNT - Limit the number of entries returned per stream * @property BLOCK - Milliseconds to block waiting for new entries (0 for indefinite) * @property NOACK - Skip adding the message to the PEL (Pending Entries List) + * @property CLAIM - Prepend PEL entries that are at least this many milliseconds old */ export interface XReadGroupOptions { COUNT?: number;