feat: python sdk batch operations & smart batching#3
Conversation
add batch_enqueue() for explicit multi-message RPCs, smart batching via BatchMode (AUTO/DISABLED/Linger) that routes enqueue() through a background batcher thread, and delivery batching that unpacks ConsumeResponse.messages repeated field. update proto to include BatchEnqueue RPC and ConsumeResponse batched messages field. single-item optimization uses singular Enqueue RPC to preserve error types. close() drains pending messages before disconnecting.
There was a problem hiding this comment.
5 issues found across 13 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="fila/errors.py">
<violation number="1" location="fila/errors.py:75">
P2: `_map_batch_enqueue_error()` does not return `BatchEnqueueError` for RPC-level batch failures, despite the new exception being documented for exactly that case.</violation>
</file>
<file name="fila/v1/service_pb2_grpc.py">
<violation number="1" location="fila/v1/service_pb2_grpc.py:8">
P2: This generated version pin raises the SDK’s runtime minimum to grpcio 1.78.1, which can break users on the declared supported range (>=1.60.0) at import time.</violation>
</file>
<file name="fila/v1/messages_pb2_grpc.py">
<violation number="1" location="fila/v1/messages_pb2_grpc.py:7">
P2: This change raises the minimum runtime grpcio version to 1.78.1 and can break imports for users on 1.78.0 due to the generated runtime version guard.</violation>
</file>
<file name="fila/batcher.py">
<violation number="1" location="fila/batcher.py:102">
P1: Futures left permanently unresolved when server returns fewer results than batch size. If `len(resp.results) < len(batch)`, the remaining futures never get a result or exception, causing callers to block forever on `.result()`. Add a fallback after the loop to fail any unmatched futures.</violation>
</file>
<file name="tests/test_batcher.py">
<violation number="1" location="tests/test_batcher.py:243">
P2: This assertion is too weak for a batching behavior test; it can pass even when messages are not batched.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return | ||
|
|
||
| # Pair each result with its request future. | ||
| for i, result in enumerate(resp.results): |
There was a problem hiding this comment.
P1: Futures left permanently unresolved when server returns fewer results than batch size. If len(resp.results) < len(batch), the remaining futures never get a result or exception, causing callers to block forever on .result(). Add a fallback after the loop to fail any unmatched futures.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/batcher.py, line 102:
<comment>Futures left permanently unresolved when server returns fewer results than batch size. If `len(resp.results) < len(batch)`, the remaining futures never get a result or exception, causing callers to block forever on `.result()`. Add a fallback after the loop to fail any unmatched futures.</comment>
<file context>
@@ -0,0 +1,267 @@
+ return
+
+ # Pair each result with its request future.
+ for i, result in enumerate(resp.results):
+ if i >= len(batch):
+ break
</file context>
| code = err.code() | ||
| if code == grpc.StatusCode.NOT_FOUND: | ||
| return QueueNotFoundError(f"batch_enqueue: {err.details()}") | ||
| return RPCError(code, err.details() or "") |
There was a problem hiding this comment.
P2: _map_batch_enqueue_error() does not return BatchEnqueueError for RPC-level batch failures, despite the new exception being documented for exactly that case.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/errors.py, line 75:
<comment>`_map_batch_enqueue_error()` does not return `BatchEnqueueError` for RPC-level batch failures, despite the new exception being documented for exactly that case.</comment>
<file context>
@@ -56,3 +65,11 @@ def _map_nack_error(err: grpc.RpcError) -> FilaError:
+ code = err.code()
+ if code == grpc.StatusCode.NOT_FOUND:
+ return QueueNotFoundError(f"batch_enqueue: {err.details()}")
+ return RPCError(code, err.details() or "")
</file context>
| from fila.v1 import service_pb2 as fila_dot_v1_dot_service__pb2 | ||
|
|
||
| GRPC_GENERATED_VERSION = '1.78.0' | ||
| GRPC_GENERATED_VERSION = '1.78.1' |
There was a problem hiding this comment.
P2: This generated version pin raises the SDK’s runtime minimum to grpcio 1.78.1, which can break users on the declared supported range (>=1.60.0) at import time.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/v1/service_pb2_grpc.py, line 8:
<comment>This generated version pin raises the SDK’s runtime minimum to grpcio 1.78.1, which can break users on the declared supported range (>=1.60.0) at import time.</comment>
<file context>
@@ -5,7 +5,7 @@
from fila.v1 import service_pb2 as fila_dot_v1_dot_service__pb2
-GRPC_GENERATED_VERSION = '1.78.0'
+GRPC_GENERATED_VERSION = '1.78.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
</file context>
|
|
||
|
|
||
| GRPC_GENERATED_VERSION = '1.78.0' | ||
| GRPC_GENERATED_VERSION = '1.78.1' |
There was a problem hiding this comment.
P2: This change raises the minimum runtime grpcio version to 1.78.1 and can break imports for users on 1.78.0 due to the generated runtime version guard.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At fila/v1/messages_pb2_grpc.py, line 7:
<comment>This change raises the minimum runtime grpcio version to 1.78.1 and can break imports for users on 1.78.0 due to the generated runtime version guard.</comment>
<file context>
@@ -4,7 +4,7 @@
-GRPC_GENERATED_VERSION = '1.78.0'
+GRPC_GENERATED_VERSION = '1.78.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
</file context>
| # Either BatchEnqueue or multiple Enqueue calls will resolve things. | ||
| for _i, f in enumerate(futures): | ||
| result = f.result(timeout=5.0) | ||
| assert result is not None |
There was a problem hiding this comment.
P2: This assertion is too weak for a batching behavior test; it can pass even when messages are not batched.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tests/test_batcher.py, line 243:
<comment>This assertion is too weak for a batching behavior test; it can pass even when messages are not batched.</comment>
<file context>
@@ -0,0 +1,333 @@
+ # Either BatchEnqueue or multiple Enqueue calls will resolve things.
+ for _i, f in enumerate(futures):
+ result = f.result(timeout=5.0)
+ assert result is not None
+
+ batcher.close()
</file context>
Summary
batch_enqueue()method for explicit multi-message RPCs on both sync and async clientsBatchModeenum (AUTO/DISABLED) andLingerdataclass, routingenqueue()through a background batcher thread by defaultConsumeResponse.messagesrepeated field with backward-compatible fallback to singularmessagefieldEnqueueRPC to preserve error types (QueueNotFoundErrorvsBatchEnqueueError)close()drains pending batched messages before disconnectingBatchEnqueueRPC andConsumeResponse.messagesrepeated fieldTest plan
_flush_singleand_flush_batchwith mock stubsAutoBatcher(single message, concurrent batching, close drains, stub update)LingerBatcher(batch size trigger, linger timeout, close drains)BatchMode,Linger,BatchEnqueueResulttypesbatch_enqueue()(multiple messages, single message, consume verification)batch_enqueue()Generated with Claude Code
Summary by cubic
Add explicit
batch_enqueue()to the Python SDK and enable smart batching by default (AUTO) forenqueue(). Also add batched delivery inconsume()and update the proto withBatchEnqueueandConsumeResponse.messages.New Features
Client.batch_enqueue()andAsyncClient.batch_enqueue()send many messages in one RPC; return per-messageBatchEnqueueResult.BatchMode(AUTOdefault,DISABLED) andLinger; background batcher with single-item optimization;close()drains pending; reconnect updates batcher stub.consume()transparently iteratesConsumeResponse.messageswith fallback to singularmessage.BatchEnqueueRPC andConsumeResponse.messages; newBatchEnqueueError; public exports updated.Migration
enqueue()now routes through a background batcher by default; disable withbatch_mode=BatchMode.DISABLED, or useLinger(linger_ms, batch_size)for timer-based batching.close()to flush any pending batched messages before shutdown.Written for commit dcac3da. Summary will update on new commits.