diff --git a/package.json b/package.json index c59f541..b869f69 100644 --- a/package.json +++ b/package.json @@ -9,8 +9,7 @@ "main": "dist/src/index.js", "types": "dist/src/index.d.ts", "files": [ - "dist/", - "proto/" + "dist/" ], "scripts": { "build": "tsc", @@ -19,9 +18,7 @@ "test": "vitest run" }, "license": "AGPL-3.0-or-later", - "dependencies": { - "protobufjs": "^7.5.0" - }, + "dependencies": {}, "devDependencies": { "@types/node": "^22.0.0", "eslint": "^9.0.0", diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto deleted file mode 100644 index 886e58d..0000000 --- a/proto/fila/v1/admin.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -// Admin RPCs for operators and the CLI. -service FilaAdmin { - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc SetConfig(SetConfigRequest) returns (SetConfigResponse); - rpc GetConfig(GetConfigRequest) returns (GetConfigResponse); - rpc ListConfig(ListConfigRequest) returns (ListConfigResponse); - rpc GetStats(GetStatsRequest) returns (GetStatsResponse); - rpc Redrive(RedriveRequest) returns (RedriveResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); -} - -message CreateQueueRequest { - string name = 1; - QueueConfig config = 2; -} - -message QueueConfig { - string on_enqueue_script = 1; - string on_failure_script = 2; - uint64 visibility_timeout_ms = 3; -} - -message CreateQueueResponse { - string queue_id = 1; -} - -message DeleteQueueRequest { - string queue = 1; -} - -message DeleteQueueResponse {} - -message SetConfigRequest { - string key = 1; - string value = 2; -} - -message SetConfigResponse {} - -message GetConfigRequest { - string key = 1; -} - -message GetConfigResponse { - string value = 1; -} - -message ConfigEntry { - string key = 1; - string value = 2; -} - -message ListConfigRequest { - string prefix = 1; -} - -message ListConfigResponse { - repeated ConfigEntry entries = 1; - uint32 total_count = 2; -} - -message GetStatsRequest { - string queue = 1; -} - -message PerFairnessKeyStats { - string key = 1; - uint64 pending_count = 2; - int64 current_deficit = 3; - uint32 weight = 4; -} - -message PerThrottleKeyStats { - string key = 1; - double tokens = 2; - double rate_per_second = 3; - double burst = 4; -} - -message GetStatsResponse { - uint64 depth = 1; - uint64 in_flight = 2; - uint64 active_fairness_keys = 3; - uint32 active_consumers = 4; - uint32 quantum = 5; - repeated PerFairnessKeyStats per_key_stats = 6; - repeated PerThrottleKeyStats per_throttle_stats = 7; - // Cluster fields (0 when not in cluster mode). - uint64 leader_node_id = 8; - uint32 replication_count = 9; -} - -message RedriveRequest { - string dlq_queue = 1; - uint64 count = 2; -} - -message RedriveResponse { - uint64 redriven = 1; -} - -message ListQueuesRequest {} - -message QueueInfo { - string name = 1; - uint64 depth = 2; - uint64 in_flight = 3; - uint32 active_consumers = 4; - uint64 leader_node_id = 5; -} - -message ListQueuesResponse { - repeated QueueInfo queues = 1; - uint32 cluster_node_count = 2; -} - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/messages.proto b/proto/fila/v1/messages.proto deleted file mode 100644 index a0709cf..0000000 --- a/proto/fila/v1/messages.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "google/protobuf/timestamp.proto"; - -// Core message envelope persisted in the broker. -message Message { - string id = 1; - map headers = 2; - bytes payload = 3; - MessageMetadata metadata = 4; - MessageTimestamps timestamps = 5; -} - -// Broker-assigned scheduling metadata. -message MessageMetadata { - string fairness_key = 1; - uint32 weight = 2; - repeated string throttle_keys = 3; - uint32 attempt_count = 4; - string queue_id = 5; -} - -// Lifecycle timestamps attached to every message. -message MessageTimestamps { - google.protobuf.Timestamp enqueued_at = 1; - google.protobuf.Timestamp leased_at = 2; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto deleted file mode 100644 index 7d1db79..0000000 --- a/proto/fila/v1/service.proto +++ /dev/null @@ -1,142 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "fila/v1/messages.proto"; - -// Hot-path RPCs for producers and consumers. -service FilaService { - rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); - rpc Ack(AckRequest) returns (AckResponse); - rpc Nack(NackRequest) returns (NackResponse); -} - -// Individual message to enqueue. -message EnqueueMessage { - string queue = 1; - map headers = 2; - bytes payload = 3; -} - -// Enqueue one or more messages. -message EnqueueRequest { - repeated EnqueueMessage messages = 1; -} - -// Per-message enqueue result. -message EnqueueResult { - oneof result { - string message_id = 1; - EnqueueError error = 2; - } -} - -// Typed enqueue error with structured error code. -message EnqueueError { - EnqueueErrorCode code = 1; - string message = 2; -} - -enum EnqueueErrorCode { - ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; - ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; - ENQUEUE_ERROR_CODE_STORAGE = 2; - ENQUEUE_ERROR_CODE_LUA = 3; - ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; -} - -// One result per input message. -message EnqueueResponse { - repeated EnqueueResult results = 1; -} - -message ConsumeRequest { - string queue = 1; -} - -message ConsumeResponse { - repeated Message messages = 1; -} - -// Individual ack item. -message AckMessage { - string queue = 1; - string message_id = 2; -} - -message AckRequest { - repeated AckMessage messages = 1; -} - -message AckResult { - oneof result { - AckSuccess success = 1; - AckError error = 2; - } -} - -message AckSuccess {} - -message AckError { - AckErrorCode code = 1; - string message = 2; -} - -enum AckErrorCode { - ACK_ERROR_CODE_UNSPECIFIED = 0; - ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - ACK_ERROR_CODE_STORAGE = 2; - ACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message AckResponse { - repeated AckResult results = 1; -} - -// Individual nack item. -message NackMessage { - string queue = 1; - string message_id = 2; - string error = 3; -} - -message NackRequest { - repeated NackMessage messages = 1; -} - -message NackResult { - oneof result { - NackSuccess success = 1; - NackError error = 2; - } -} - -message NackSuccess {} - -message NackError { - NackErrorCode code = 1; - string message = 2; -} - -enum NackErrorCode { - NACK_ERROR_CODE_UNSPECIFIED = 0; - NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - NACK_ERROR_CODE_STORAGE = 2; - NACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message NackResponse { - repeated NackResult results = 1; -} - -// Stream enqueue — per-write batch with sequence tracking. -message StreamEnqueueRequest { - repeated EnqueueMessage messages = 1; - uint64 sequence_number = 2; -} - -message StreamEnqueueResponse { - uint64 sequence_number = 1; - repeated EnqueueResult results = 2; -} diff --git a/src/transport.ts b/src/transport.ts index b4ace50..3688fb1 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -23,7 +23,7 @@ export const Op = { ACK: 0x03, NACK: 0x04, - // Admin + // Admin (binary body) CREATE_QUEUE: 0x10, DELETE_QUEUE: 0x11, QUEUE_STATS: 0x12, diff --git a/test/helpers.ts b/test/helpers.ts index d1923de..8b94ade 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -1,16 +1,16 @@ -import { spawn, execFileSync } from "child_process"; +import { spawn } from "child_process"; import * as fs from "fs"; +import { execFileSync } from "child_process"; import * as net from "net"; import * as os from "os"; import * as path from "path"; -import * as protobuf from "protobufjs"; import { FibpConnection, Op, + encodeFrame, + nextCorrId, } from "../src/transport"; -const PROTO_DIR = path.join(__dirname, "..", "proto"); - function findServerBinary(): string { if (process.env.FILA_SERVER_BIN) { return process.env.FILA_SERVER_BIN; @@ -56,41 +56,34 @@ export interface TestServerOptions { export const FILA_SERVER_BIN = findServerBinary(); export const FILA_SERVER_AVAILABLE = fs.existsSync(FILA_SERVER_BIN); -// ---- Protobuf loading ------------------------------------------------------- - -let _adminRoot: protobuf.Root | null = null; - -function getAdminRoot(): protobuf.Root { - if (_adminRoot) return _adminRoot; - _adminRoot = new protobuf.Root(); - _adminRoot.resolvePath = (_origin, target) => { - // If the target is already an absolute path (e.g. the initial loadSync - // call), return it unchanged — prepending PROTO_DIR would double the path. - if (path.isAbsolute(target)) return target; - // Resolve google/protobuf includes from the protobufjs built-ins. - if (target.startsWith("google/")) { - return path.join(__dirname, "..", "node_modules", "protobufjs", target); - } - return path.join(PROTO_DIR, target); - }; - _adminRoot.loadSync(path.join(PROTO_DIR, "fila", "v1", "admin.proto")); - _adminRoot.loadSync(path.join(PROTO_DIR, "fila", "v1", "messages.proto")); - return _adminRoot; +// ---- Binary admin encoding --------------------------------------------------- + +/** + * Encode a CreateQueue request using the binary wire format. + * + * Wire format: + * queue_len:u16 + queue:utf8 + * + on_enqueue_len:u16 + on_enqueue:utf8 + * + on_failure_len:u16 + on_failure:utf8 + * + visibility_timeout_ms:u32 + */ +function encodeCreateQueuePayload(name: string): Buffer { + const nameBuf = Buffer.from(name, "utf8"); + const buf = Buffer.allocUnsafe(2 + nameBuf.length + 2 + 2 + 4); + let off = 0; + buf.writeUInt16BE(nameBuf.length, off); off += 2; + nameBuf.copy(buf, off); off += nameBuf.length; + buf.writeUInt16BE(0, off); off += 2; // on_enqueue: empty + buf.writeUInt16BE(0, off); off += 2; // on_failure: empty + buf.writeUInt32BE(0, off); // visibility_timeout_ms: 0 + return buf; } -function encodeAdminMsg(typeName: string, fields: Record): Buffer { - const root = getAdminRoot(); - const MsgType = root.lookupType(typeName); - const err = MsgType.verify(fields); - if (err) throw new Error(`protobuf verify failed: ${err}`); - const msg = MsgType.create(fields); - return Buffer.from(MsgType.encode(msg).finish()); -} - -function decodeAdminMsg(typeName: string, buf: Buffer): T { - const root = getAdminRoot(); - const MsgType = root.lookupType(typeName); - return MsgType.decode(buf) as unknown as T; +/** + * Encode a ListQueues request (empty payload). + */ +function encodeListQueuesPayload(): Buffer { + return Buffer.alloc(0); } // ---- Admin FIBP helpers ----------------------------------------------------- @@ -113,15 +106,12 @@ async function connectAdmin(addr: string, opts: TestServerOptions): Promise { - const payload = encodeAdminMsg("fila.v1.ListQueuesRequest", {}); + const payload = encodeListQueuesPayload(); await conn.request(Op.LIST_QUEUES, payload); } async function callCreateQueue(conn: FibpConnection, name: string): Promise { - const payload = encodeAdminMsg("fila.v1.CreateQueueRequest", { - name, - config: {}, - }); + const payload = encodeCreateQueuePayload(name); await conn.request(Op.CREATE_QUEUE, payload); } @@ -304,6 +294,3 @@ export function generateTestCerts(outputDir: string): { clientKey: fs.readFileSync(clientKeyPath), }; } - -// Re-export for tests that decode admin responses. -export { decodeAdminMsg, encodeAdminMsg };