Skip to content

feat: replace grpc transport with fibp#6

Merged
vieiralucas merged 2 commits intomainfrom
feat/fibp-transport
Mar 26, 2026
Merged

feat: replace grpc transport with fibp#6
vieiralucas merged 2 commits intomainfrom
feat/fibp-transport

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Mar 26, 2026

Summary

  • Replaces the gRPC transport with FIBP (Fila Binary Protocol) — a custom length-prefixed binary framing protocol over raw TCP
  • Drops google.golang.org/grpc dependency entirely; go.mod now requires only google.golang.org/protobuf (retained for admin op payloads per spec)
  • Public API is identical: Dial, Enqueue, Consume, Ack, Nack, accumulator modes, TLS options, API key auth all preserved

What changed

New files:

  • transport.go — FIBP frame codec ([4-byte len][flags|op|corr_id|payload]), TCP handshake (FIBP\x01\x00 echo), corr-id multiplexer, server-push routing for consume streams
  • admin.go — FIBP-based AdminClient (CreateQueue, DeleteQueue, ListQueues) with protobuf-encoded payloads per spec
  • enqueue_codec_test.go — unit tests for enqueue encoder/decoder roundtrip

Rewritten files:

  • client.gonet.Dial/tls.Dial instead of grpc.NewClient; API key sent as FIBP AUTH frame during handshake
  • enqueue.go — binary encode/decode for enqueue request/response; per-queue grouping for mixed-queue EnqueueMany
  • consume.go — binary consume request; decode pushed delivery frames (msg_id, fairness_key, queue, attempt_count, headers, payload); leader-redirect via FIBP error code
  • ack.go, nack.go — binary encode/decode for ack/nack
  • accumulator.go — operates on *conn directly; no filav1 service client dependency
  • errors.go — drop gRPC codes/status; add ErrConnectionClosed; RPCErrorProtocolError
  • Test helpers rewritten to use FIBP for server readiness polling (waitForFIBP) and queue creation (client.Admin().CreateQueue)
  • consume_test.go — proto-based unit tests replaced with FIBP frame codec tests

Deleted:

  • filav1/admin_grpc.pb.go, filav1/service_grpc.pb.go — gRPC service stubs no longer needed

Test plan

  • All 10 unit tests pass: go test -v ./... (codec roundtrips, error decoding, TLS config validation)
  • Integration tests (TestEnqueueConsumeAck, TestAPIKeyAuth, TLS tests, accumulator tests) execute correctly when FILA_SERVER_BIN points to a FIBP-enabled fila-server binary
  • go vet ./... clean
  • go.mod contains no gRPC dependency

🤖 Generated with Claude Code


Summary by cubic

Replaced the gRPC transport with FIBP (Fila Binary Protocol) over raw TCP, keeping the public API intact and removing the google.golang.org/grpc dependency. This reduces overhead and improves streaming; google.golang.org/protobuf is retained.

  • New Features

    • FIBP transport with length‑prefixed frames, handshake, and corr‑id multiplexing; Dial now opens TCP and completes the FIBP handshake eagerly.
    • Binary encode/decode for Enqueue, Consume, Ack, and Nack, including server‑push routing for consume and leader‑redirect via protocol error.
    • FIBP AdminClient using protobuf payloads (CreateQueue, DeleteQueue, ListQueues).
    • Consumer flow credits (default 256) to bound in‑flight messages.
  • Bug Fixes

    • conn.send now honors context deadlines/cancellation; callers pass through ctx (admin uses background).
    • Added 16 MiB max frame size guard to prevent runaway allocations.
    • Fixed pending response delivery race by holding the pending lock during send.
    • Added header count bounds check (>255) to avoid uint8 wrap.
    • Removed dead buildTestTLSConfig helper (lint).

Written for commit 71770c8. Summary will update on new commits.

rewrite the go sdk transport layer from grpc to fibp — a custom
length-prefixed binary framing protocol over raw tcp.

- transport.go: fibp frame codec, handshake, corr-id multiplexer, push
  channel routing for consume streams
- client.go: dial via net.Dial/tls.Dial instead of grpc.NewClient; auth
  via fibp auth frame instead of per-rpc metadata credentials
- enqueue.go: binary encode/decode for enqueue request/response
- consume.go: binary encode for consume request; decode pushed delivery
  frames; leader-hint redirect via fibp error code
- ack.go, nack.go: binary encode/decode for ack/nack request/response
- accumulator.go: operates on *conn directly, no filav1 dependency
- admin.go: new fibp-based admin client (createqueue, deletequeue,
  listqueues) using protobuf-encoded payloads per spec
- errors.go: drop grpc codes/status; add ErrConnectionClosed; replace
  RPCError with ProtocolError
- remove filav1/admin_grpc.pb.go and filav1/service_grpc.pb.go (grpc
  service stubs no longer needed; proto message types retained)
- drop google.golang.org/grpc dependency; go.mod now requires only
  google.golang.org/protobuf
- update tests to use fibp for server readiness polling and queue
  creation; replace grpc-based unit tests with fibp codec unit tests
- 10 unit tests pass; integration tests skip without server binary
vieiralucas added a commit to faiscadev/fila that referenced this pull request Mar 26, 2026
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 issues found across 18 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="enqueue.go">

<violation number="1" location="enqueue.go:66">
P1: `ctx` is accepted but never used — context cancellation and deadlines are silently ignored. Callers of `Enqueue` and `EnqueueMany` pass a context expecting it to control timeouts, but `c.send()` blocks indefinitely with no `select` on `ctx.Done()`.</violation>

<violation number="2" location="enqueue.go:145">
P2: Silent integer truncation: `uint8(len(msg.Headers))` wraps at 256, corrupting the wire frame. Add a bounds check and return an error if the header count exceeds 255.</violation>
</file>

<file name="transport.go">

<violation number="1" location="transport.go:147">
P1: Race condition: `readLoop` releases `streamsMu` before sending on `ch`, but `closeStream` can `close(ch)` between the unlock and the send, causing a panic on send-to-closed-channel. Since the send uses a non-blocking `select`, the simplest fix is to hold the lock across the send.</violation>

<violation number="2" location="transport.go:272">
P1: No maximum frame size check: `totalLen` is an untrusted `uint32` from the wire used directly in `make([]byte, totalLen)`. A malicious peer can trigger a ~4 GB allocation causing OOM. Add a reasonable upper bound (e.g., 16 MB) and reject oversized frames.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread enqueue.go
Comment thread transport.go
Comment thread transport.go
Comment thread enqueue.go
- wire ctx through conn.send() so callers respect deadline/cancellation
- add 16mib max frame size guard in readframe to prevent runaway allocs
- fix pending channel send race by holding pendingmu during delivery
- add header count bounds check (>255 returns error before u8 wrap)
- remove dead buildtesttlsconfig function (lint failure)

all callers of send() updated: enqueue, ack, nack, admin (background ctx)
@vieiralucas vieiralucas merged commit 51b673c into main Mar 26, 2026
2 of 3 checks passed
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 7 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="enqueue.go">

<violation number="1" location="enqueue.go:90">
P2: A validation error in a single message incorrectly fails all other valid messages in the same queue group. Validation should occur before grouping.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread enqueue.go

for _, q := range order {
g := groups[q]
payload, encErr := encodeEnqueueRequest(g.queue, g.messages)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: A validation error in a single message incorrectly fails all other valid messages in the same queue group. Validation should occur before grouping.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At enqueue.go, line 90:

<comment>A validation error in a single message incorrectly fails all other valid messages in the same queue group. Validation should occur before grouping.</comment>

<file context>
@@ -87,9 +87,15 @@ func enqueueRaw(ctx context.Context, c *conn, messages []EnqueueMessage) ([]Enqu
 	for _, q := range order {
 		g := groups[q]
-		payload := encodeEnqueueRequest(g.queue, g.messages)
+		payload, encErr := encodeEnqueueRequest(g.queue, g.messages)
+		if encErr != nil {
+			for _, idx := range g.indices {
</file context>
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant