feat: batch enqueue, smart batching, and delivery batching#3
feat: batch enqueue, smart batching, and delivery batching#3vieiralucas merged 1 commit intomainfrom
Conversation
add batchEnqueue() for explicit batch operations with per-message results. enqueue() now routes through an auto-batcher by default (setImmediate-based opportunistic batching). three batch modes: auto (default), linger (timer-based), and disabled. single-item optimization uses Enqueue RPC to preserve error types. consume stream transparently unpacks batched ConsumeResponse.messages field. close() drains pending messages before disconnecting.
There was a problem hiding this comment.
4 issues found across 15 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="test/batch.test.ts">
<violation number="1" location="test/batch.test.ts:79">
P2: This test title claims to verify result ordering, but the assertions only verify that two IDs are different. Rename the test or add true order assertions to avoid false confidence.</violation>
</file>
<file name="test/batcher.unit.test.ts">
<violation number="1" location="test/batcher.unit.test.ts:10">
P2: `client.close()` now returns a Promise; these tests call it without `await`, so cleanup can run outside the test and hide async failures.</violation>
</file>
<file name="src/batcher.ts">
<violation number="1" location="src/batcher.ts:160">
P1: `drain()` resolves before in-flight RPCs complete. After `flushAll()` splices all items out of `pending`, the synchronous `notifyDrainComplete()` at the bottom fires immediately (since `pending.length === 0`), resolving the drain promise while `flushBatch()` calls are still in flight. `close()` will then tear down the gRPC channel before those RPCs finish.
Track in-flight flush promises and only notify drain when both `pending` is empty and all flushes have settled.</violation>
</file>
<file name="src/client.ts">
<violation number="1" location="src/client.ts:372">
P2: `doBatchEnqueue` does not validate that the server returned exactly one result per input message. If the server returns fewer results, the caller silently receives a short array, violating the documented contract. Consider adding a length check like the one in `Batcher.flushMultiple`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| const items = this.pending.splice(0, this.maxBatchSize); | ||
| // Fire-and-forget: flush concurrently. | ||
| this.flushBatch(items).then(() => { | ||
| this.notifyDrainComplete(); |
There was a problem hiding this comment.
P1: drain() resolves before in-flight RPCs complete. After flushAll() splices all items out of pending, the synchronous notifyDrainComplete() at the bottom fires immediately (since pending.length === 0), resolving the drain promise while flushBatch() calls are still in flight. close() will then tear down the gRPC channel before those RPCs finish.
Track in-flight flush promises and only notify drain when both pending is empty and all flushes have settled.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/batcher.ts, line 160:
<comment>`drain()` resolves before in-flight RPCs complete. After `flushAll()` splices all items out of `pending`, the synchronous `notifyDrainComplete()` at the bottom fires immediately (since `pending.length === 0`), resolving the drain promise while `flushBatch()` calls are still in flight. `close()` will then tear down the gRPC channel before those RPCs finish.
Track in-flight flush promises and only notify drain when both `pending` is empty and all flushes have settled.</comment>
<file context>
@@ -0,0 +1,266 @@
+ const items = this.pending.splice(0, this.maxBatchSize);
+ // Fire-and-forget: flush concurrently.
+ this.flushBatch(items).then(() => {
+ this.notifyDrainComplete();
+ });
+ }
</file context>
| } | ||
| }); | ||
|
|
||
| it("returns message IDs in same order as input", async () => { |
There was a problem hiding this comment.
P2: This test title claims to verify result ordering, but the assertions only verify that two IDs are different. Rename the test or add true order assertions to avoid false confidence.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At test/batch.test.ts, line 79:
<comment>This test title claims to verify result ordering, but the assertions only verify that two IDs are different. Rename the test or add true order assertions to avoid false confidence.</comment>
<file context>
@@ -0,0 +1,304 @@
+ }
+ });
+
+ it("returns message IDs in same order as input", async () => {
+ await server.createQueue("batch-order");
+
</file context>
| // The batcher is initialized but won't do anything until enqueue is called. | ||
| const client = new Client("localhost:9999"); | ||
| // close() should succeed even without a real server (just closes channel). | ||
| client.close(); |
There was a problem hiding this comment.
P2: client.close() now returns a Promise; these tests call it without await, so cleanup can run outside the test and hide async failures.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At test/batcher.unit.test.ts, line 10:
<comment>`client.close()` now returns a Promise; these tests call it without `await`, so cleanup can run outside the test and hide async failures.</comment>
<file context>
@@ -0,0 +1,54 @@
+ // The batcher is initialized but won't do anything until enqueue is called.
+ const client = new Client("localhost:9999");
+ // close() should succeed even without a real server (just closes channel).
+ client.close();
+ });
+
</file context>
| return; | ||
| } | ||
|
|
||
| const results: BatchEnqueueResult[] = resp!.results.map( |
There was a problem hiding this comment.
P2: doBatchEnqueue does not validate that the server returned exactly one result per input message. If the server returns fewer results, the caller silently receives a short array, violating the documented contract. Consider adding a length check like the one in Batcher.flushMultiple.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/client.ts, line 372:
<comment>`doBatchEnqueue` does not validate that the server returned exactly one result per input message. If the server returns fewer results, the caller silently receives a short array, violating the documented contract. Consider adding a length check like the one in `Batcher.flushMultiple`.</comment>
<file context>
@@ -237,11 +331,75 @@ export class Client {
+ return;
+ }
+
+ const results: BatchEnqueueResult[] = resp!.results.map(
+ (r: { result?: string; success?: { messageId?: string } | null; error?: string }) => {
+ if (r.result === "success" && r.success) {
</file context>
| const results: BatchEnqueueResult[] = resp!.results.map( | |
| const rawResults = resp!.results; | |
| if (rawResults.length !== messages.length) { | |
| reject(new RPCError(grpc.status.INTERNAL, `server returned ${rawResults.length} results for ${messages.length} messages`)); | |
| return; | |
| } | |
| const results: BatchEnqueueResult[] = rawResults.map( |
Summary
batchEnqueue()method for explicit batch operations with per-message success/error results viaBatchEnqueueRPCenqueue()through an auto-batcher by default (opportunistic batching viasetImmediate— zero latency at low load, natural batching at high load)auto(default),linger(timer-based withlingerMs/batchSize),disabledEnqueueRPC to preserve typed errors (e.g.,QueueNotFoundError)ConsumeResponse.messagesrepeated field, with backward-compatible fallback to singularmessagefieldclose()is now async and drains pending batched messages before disconnectingBatchEnqueueRPC,BatchEnqueueRequest/Response/Resultmessages, andConsumeResponse.messagesrepeated fieldTest plan
batchEnqueuewith multiple messages, per-message errors, result orderingclose()drain behaviorclose()→await close()) and still passSummary by cubic
Add explicit batch enqueue and smart, default batching for
enqueue()to boost throughput with zero added latency at low load. Also add batched delivery inconsume()and makeclose()drain pending work.New Features
Client.batchEnqueue(messages)returns per-message results viaBatchEnqueueRPC.enqueue()now uses an auto-batcher by default (event-loop clustering). Modes:auto(default),linger(timer-based),disabled.EnqueueRPC to preserve typed errors likeQueueNotFoundError.consume()unpacksConsumeResponse.messageswhen present, with fallback tomessage.close()is async and drains pending batched messages before disconnect.BatchEnqueue*messages/RPC andConsumeResponse.messages.Migration
client.close()withawait client.close().batchMode: "disabled"inClientOptions. For timer-based batching:batchMode: "linger"withlingerMsandbatchSize.Written for commit ef1ef5d. Summary will update on new commits.