Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

JavaScript/TypeScript client SDK for the [Fila](https://github.com/faisca/fila) message broker.

Uses the **FIBP** (Fila Binary Protocol) transport — a length-prefixed binary framing protocol over raw TCP, replacing the previous gRPC transport. No protobuf runtime required on the hot path.

## Installation

```bash
Expand Down Expand Up @@ -35,15 +37,15 @@ for await (const msg of client.consume("my-queue")) {
}
}

client.close();
await client.close();
```

### TLS (system trust store)

If the Fila server uses a certificate signed by a public CA, enable TLS without providing a CA certificate — the OS system trust store is used automatically:

```typescript
import { Client } from "@fila/client";
import { Client } from "fila-client";

const client = new Client("localhost:5555", { tls: true });
```
Expand All @@ -67,7 +69,7 @@ Client certificates work with both modes — system trust store or custom CA:

```typescript
import * as fs from "fs";
import { Client } from "@fila/client";
import { Client } from "fila-client";

// With custom CA:
const client = new Client("localhost:5555", {
Expand All @@ -87,7 +89,7 @@ const client2 = new Client("localhost:5555", {
### API key authentication

```typescript
import { Client } from "@fila/client";
import { Client } from "fila-client";

const client = new Client("localhost:5555", {
apiKey: "my-api-key",
Expand All @@ -98,7 +100,7 @@ const client = new Client("localhost:5555", {

```typescript
import * as fs from "fs";
import { Client } from "@fila/client";
import { Client } from "fila-client";

const client = new Client("localhost:5555", {
caCert: fs.readFileSync("ca.pem"),
Expand All @@ -112,21 +114,29 @@ const client = new Client("localhost:5555", {

### `new Client(addr: string, options?: ClientOptions)`

Connect to a Fila broker at the given address (e.g., `"localhost:5555"`).
Connect to a Fila broker at the given address (e.g., `"localhost:5555"`). The TCP connection and FIBP handshake are established lazily on the first operation.

**Options:**

| Option | Type | Description |
|-------------|-----------|---------------------------------------------------------------------|
| `tls` | `boolean` | Enable TLS using the OS system trust store. Implied when `caCert` is set. |
| `caCert` | `Buffer` | CA certificate PEM. Enables TLS with a custom CA when set. |
| `clientCert`| `Buffer` | Client certificate PEM for mTLS. Requires TLS to be enabled. |
| `clientKey` | `Buffer` | Client private key PEM for mTLS. Requires TLS to be enabled. |
| `apiKey` | `string` | API key sent as `Bearer` token on every RPC call. |
| Option | Type | Description |
|----------------|-----------|-----------------------------------------------------------------------------------|
| `tls` | `boolean` | Enable TLS using the OS system trust store. Implied when `caCert` is set. |
| `caCert` | `Buffer` | CA certificate PEM. Enables TLS with a custom CA when set. |
| `clientCert` | `Buffer` | Client certificate PEM for mTLS. Requires TLS to be enabled. |
| `clientKey` | `Buffer` | Client private key PEM for mTLS. Requires TLS to be enabled. |
| `apiKey` | `string` | API key sent as an AUTH frame immediately after the FIBP handshake. |
| `batchMode` | `string` | `'auto'` (default), `'linger'`, or `'disabled'`. Controls enqueue batching. |
| `maxBatchSize` | `number` | Maximum batch size for auto mode. Default: 100. |
| `lingerMs` | `number` | Linger timeout in ms for linger mode. Required when `batchMode` is `'linger'`. |
| `batchSize` | `number` | Maximum batch size for linger mode. Required when `batchMode` is `'linger'`. |

### `client.enqueue(queue, headers, payload): Promise<string>`

Enqueue a message. Returns the broker-assigned message ID (UUIDv7).
Enqueue a message. Returns the broker-assigned message ID (UUIDv7). When batching is enabled (default `'auto'`), messages are opportunistically grouped for throughput.

### `client.enqueueMany(messages): Promise<EnqueueResult[]>`

Enqueue multiple messages in a single request, bypassing the batcher. Returns one result per input message in the same order.

### `client.consume(queue): AsyncIterable<ConsumeMessage>`

Expand All @@ -140,34 +150,54 @@ Acknowledge a successfully processed message. The message is permanently removed

Negatively acknowledge a failed message. The message is requeued or routed to the dead-letter queue based on the queue's configuration.

### `client.close(): void`
### `client.close(): Promise<void>`

Close the underlying gRPC channel.
Drain any pending batched messages and close the TCP connection.

## Error Handling

Per-operation error classes are thrown for specific failure modes:

```typescript
import { QueueNotFoundError, MessageNotFoundError } from "fila-client";
import {
QueueNotFoundError,
MessageNotFoundError,
UnauthenticatedError,
RPCError,
} from "fila-client";

try {
await client.enqueue("missing-queue", null, Buffer.from("test"));
} catch (err) {
if (err instanceof QueueNotFoundError) {
// handle queue not found
// queue does not exist
} else if (err instanceof UnauthenticatedError) {
// invalid or missing API key
} else if (err instanceof RPCError) {
console.error("wire error code:", err.code);
}
}

try {
await client.ack("my-queue", "missing-id");
} catch (err) {
if (err instanceof MessageNotFoundError) {
// handle message not found
// message already acknowledged or expired
}
}
```

## Protocol

Fila uses **FIBP** (Fila Binary Protocol) — a lightweight binary framing protocol over raw TCP (or TLS):

- **Frame format**: `[4-byte big-endian length][flags:u8 | op:u8 | corr_id:u32 | payload]`
- **Handshake**: client sends `FIBP\x01\x00`; server echoes the same 6 bytes
- **Hot-path ops** (enqueue, consume, ack, nack): custom binary encoding, no protobuf overhead
- **Admin ops**: protobuf-encoded payloads for schema flexibility
- **Authentication**: API key sent in an AUTH frame immediately after the handshake
- **Multiplexing**: correlation IDs map responses to outstanding requests on the same connection

## License

AGPLv3
50 changes: 0 additions & 50 deletions generated/admin.ts

This file was deleted.

13 changes: 0 additions & 13 deletions generated/fila/v1/AckError.ts

This file was deleted.

20 changes: 0 additions & 20 deletions generated/fila/v1/AckErrorCode.ts

This file was deleted.

12 changes: 0 additions & 12 deletions generated/fila/v1/AckMessage.ts

This file was deleted.

11 changes: 0 additions & 11 deletions generated/fila/v1/AckRequest.ts

This file was deleted.

11 changes: 0 additions & 11 deletions generated/fila/v1/AckResponse.ts

This file was deleted.

16 changes: 0 additions & 16 deletions generated/fila/v1/AckResult.ts

This file was deleted.

8 changes: 0 additions & 8 deletions generated/fila/v1/AckSuccess.ts

This file was deleted.

12 changes: 0 additions & 12 deletions generated/fila/v1/AclPermission.ts

This file was deleted.

19 changes: 0 additions & 19 deletions generated/fila/v1/ApiKeyInfo.ts

This file was deleted.

12 changes: 0 additions & 12 deletions generated/fila/v1/ConfigEntry.ts

This file was deleted.

10 changes: 0 additions & 10 deletions generated/fila/v1/ConsumeRequest.ts

This file was deleted.

11 changes: 0 additions & 11 deletions generated/fila/v1/ConsumeResponse.ts

This file was deleted.

15 changes: 0 additions & 15 deletions generated/fila/v1/CreateApiKeyRequest.ts

This file was deleted.

Loading
Loading