From 00a211969f13d88492101f2b35c09b74a7adec9b Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:15:46 -0300 Subject: [PATCH 1/5] feat: replace grpc transport with fibp (fila binary protocol) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit rewrites the transport layer from gRPC/protobuf to FIBP: a custom length-prefixed binary protocol over raw TCP (or TLS). removes the grpc and google-protobuf gem dependencies entirely. new files: - lib/fila/transport.rb — tcp connection, handshake, frame read/write, corr-id multiplexing, reader thread, tls/mtls, auth frame - lib/fila/codec.rb — fibp binary encoding/decoding for enqueue, consume, ack, nack (pack/unpack, no external deps) updated: - lib/fila/client.rb — uses transport + codec instead of grpc stub - lib/fila/batcher.rb — uses transport directly instead of grpc stub - lib/fila/errors.rb — rpcerror now carries a fibp error code - lib/fila/version.rb — bumped to 0.5.0 - fila-client.gemspec — removed grpc/google-protobuf deps - test/test_helper.rb — admin ops (create_queue, wait_for_ready) reimplemented over fibp; removed grpc admin stub - test/test_tls_auth.rb — error code assertions updated to fibp codes - test/test_batch.rb — removed gRPC-specific lazy-connect assumption - README.md — documents fibp transport, removes gRPC references deleted: lib/fila/proto/ (generated protobuf ruby files) deleted: proto/ (proto source files) --- README.md | 12 +- fila-client.gemspec | 6 +- lib/fila.rb | 2 + lib/fila/batcher.rb | 78 ++--- lib/fila/client.rb | 278 ++++++++---------- lib/fila/codec.rb | 221 ++++++++++++++ lib/fila/errors.rb | 6 +- lib/fila/proto/fila/v1/admin_pb.rb | 49 --- lib/fila/proto/fila/v1/admin_services_pb.rb | 39 --- lib/fila/proto/fila/v1/messages_pb.rb | 21 -- lib/fila/proto/fila/v1/service_pb.rb | 42 --- lib/fila/proto/fila/v1/service_services_pb.rb | 29 -- lib/fila/transport.rb | 278 ++++++++++++++++++ lib/fila/version.rb | 2 +- proto/fila/v1/admin.proto | 197 ------------- proto/fila/v1/messages.proto | 28 -- proto/fila/v1/service.proto | 142 --------- test/test_batch.rb | 9 - test/test_helper.rb | 112 ++++--- test/test_tls_auth.rb | 17 +- 20 files changed, 750 insertions(+), 818 deletions(-) create mode 100644 lib/fila/codec.rb delete mode 100644 lib/fila/proto/fila/v1/admin_pb.rb delete mode 100644 lib/fila/proto/fila/v1/admin_services_pb.rb delete mode 100644 lib/fila/proto/fila/v1/messages_pb.rb delete mode 100644 lib/fila/proto/fila/v1/service_pb.rb delete mode 100644 lib/fila/proto/fila/v1/service_services_pb.rb create mode 100644 lib/fila/transport.rb delete mode 100644 proto/fila/v1/admin.proto delete mode 100644 proto/fila/v1/messages.proto delete mode 100644 proto/fila/v1/service.proto diff --git a/README.md b/README.md index 9c59120..28eb469 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Ruby client SDK for the [Fila](https://github.com/faisca/fila) message broker. +Uses the FIBP (Fila Binary Protocol) transport — a lightweight length-prefixed binary protocol over raw TCP. No gRPC or protobuf dependencies required. + ## Installation ```bash @@ -89,7 +91,7 @@ client = Fila::Client.new("localhost:5555", ```ruby require "fila" -# API key sent as Bearer token on every request. +# API key sent on every request. client = Fila::Client.new("localhost:5555", api_key: "fila_your_api_key_here" ) @@ -122,7 +124,7 @@ Connect to a Fila broker at the given address (e.g., `"localhost:5555"`). | `ca_cert:` | `String` or `nil` | PEM-encoded CA certificate for TLS (implies `tls: true`) | | `client_cert:` | `String` or `nil` | PEM-encoded client certificate for mTLS | | `client_key:` | `String` or `nil` | PEM-encoded client private key for mTLS | -| `api_key:` | `String` or `nil` | API key for Bearer token authentication | +| `api_key:` | `String` or `nil` | API key for authentication | When no TLS/auth options are provided, the client connects over plaintext (backward compatible). When `tls: true` is set without `ca_cert:`, the OS system trust store is used for server certificate verification. @@ -144,7 +146,7 @@ Negatively acknowledge a failed message. The message is requeued or routed to th ### `client.close` -Close the underlying gRPC channel. +Drain any pending batched messages, then close the underlying TCP connection. ## Error Handling @@ -164,6 +166,10 @@ rescue Fila::MessageNotFoundError => e end ``` +## Transport + +This SDK uses **FIBP** (Fila Binary Protocol): a length-prefixed binary protocol over raw TCP (or TLS). It has no runtime dependencies beyond Ruby's standard library (`socket`, `openssl`, `thread`). + ## License AGPLv3 diff --git a/fila-client.gemspec b/fila-client.gemspec index 98cbfad..04e89d9 100644 --- a/fila-client.gemspec +++ b/fila-client.gemspec @@ -7,15 +7,13 @@ Gem::Specification.new do |spec| spec.version = Fila::VERSION spec.authors = ['Faisca'] spec.summary = 'Ruby client SDK for the Fila message broker' - spec.description = "Idiomatic Ruby client wrapping Fila's gRPC API for enqueue, consume, ack, and nack operations." + spec.description = "Idiomatic Ruby client for the Fila message broker using the FIBP (Fila Binary Protocol) transport. Supports enqueue, consume, ack, nack, TLS/mTLS, and API key auth." spec.homepage = 'https://github.com/faiscadev/fila-ruby' spec.license = 'AGPL-3.0-or-later' spec.required_ruby_version = '>= 3.1' - spec.files = Dir['lib/**/*.rb', 'proto/**/*.proto', 'LICENSE', 'README.md'] + spec.files = Dir['lib/**/*.rb', 'LICENSE', 'README.md'] spec.require_paths = ['lib'] - spec.add_dependency 'google-protobuf', '~> 4.0' - spec.add_dependency 'grpc', '~> 1.60' spec.metadata['rubygems_mfa_required'] = 'true' end diff --git a/lib/fila.rb b/lib/fila.rb index 3a74a1b..2559ab7 100644 --- a/lib/fila.rb +++ b/lib/fila.rb @@ -4,5 +4,7 @@ require_relative 'fila/errors' require_relative 'fila/consume_message' require_relative 'fila/enqueue_result' +require_relative 'fila/codec' +require_relative 'fila/transport' require_relative 'fila/batcher' require_relative 'fila/client' diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index 593d323..003acf2 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -2,7 +2,7 @@ module Fila # Background batcher that collects enqueue messages and flushes them - # in batches via the unified Enqueue RPC. Supports auto (opportunistic) + # in batches via the FIBP transport. Supports auto (opportunistic) # and linger (timer-based) modes. # # @api private @@ -10,21 +10,19 @@ class Batcher # rubocop:disable Metrics/ClassLength # An item queued for batching, pairing a message with its result slot. BatchItem = Struct.new(:message, :result_queue, keyword_init: true) - # @param stub [Fila::V1::FilaService::Stub] gRPC stub - # @param metadata [Hash] call metadata (auth headers) + # @param transport [Fila::Transport] FIBP transport # @param mode [Symbol] :auto or :linger # @param max_batch_size [Integer] cap on batch size (auto mode) # @param batch_size [Integer] batch size threshold (linger mode) # @param linger_ms [Integer] linger time in ms (linger mode) - def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) - @stub = stub - @metadata = metadata - @mode = mode + def initialize(transport:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) + @transport = transport + @mode = mode @max_batch_size = mode == :auto ? max_batch_size : batch_size - @linger_ms = linger_ms - @queue = Queue.new - @stopped = false - @mutex = Mutex.new + @linger_ms = linger_ms + @queue = Queue.new + @stopped = false + @mutex = Mutex.new @thread = Thread.new { run_loop } @thread.abort_on_exception = true @@ -33,10 +31,10 @@ def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, li # Submit a message for batched sending. Blocks until the batch # containing this message is flushed and the result is available. # - # @param message [Fila::V1::EnqueueMessage] the enqueue message + # @param message [Hash] enqueue message with :queue, :payload, :headers # @return [String] message ID on success # @raise [Fila::QueueNotFoundError] if the queue does not exist - # @raise [Fila::RPCError] for unexpected gRPC failures + # @raise [Fila::RPCError] for unexpected transport failures def submit(message) result_queue = Queue.new item = BatchItem.new(message: message, result_queue: result_queue) @@ -47,10 +45,9 @@ def submit(message) @queue.push(item) end - # Block until the batcher flushes our batch and posts the result. outcome = result_queue.pop case outcome - when String then outcome + when String then outcome when Exception then raise outcome else raise Fila::Error, "unexpected batcher result: #{outcome.inspect}" end @@ -67,7 +64,7 @@ def close def run_loop case @mode - when :auto then run_auto_loop + when :auto then run_auto_loop when :linger then run_linger_loop end end @@ -116,9 +113,9 @@ def run_linger_loop def drain_nonblocking(batch) while batch.size < @max_batch_size begin - item = @queue.pop(true) # non_block = true + item = @queue.pop(true) if item == :shutdown - @queue.push(:shutdown) # re-enqueue so the loop sees it + @queue.push(:shutdown) break end batch << item @@ -128,36 +125,39 @@ def drain_nonblocking(batch) end end - # Flush a batch of items via the unified Enqueue RPC. - def flush_batch(items) - req = ::Fila::V1::EnqueueRequest.new(messages: items.map(&:message)) - results = @stub.enqueue(req, metadata: @metadata).results - - items.each_with_index do |item, idx| - item.result_queue.push(result_to_outcome(results[idx])) + # Flush a batch of items via the FIBP transport. + # Groups items by queue to produce one frame per queue. + def flush_batch(items) # rubocop:disable Metrics/MethodLength + # Group by queue, preserving per-item result queues + groups = items.each_with_index.group_by { |item, _| item.message[:queue] } + + groups.each do |queue, indexed_items| + items_only = indexed_items.map(&:first) + msgs = items_only.map(&:message) + payload = Codec.encode_enqueue(queue, msgs) + resp = @transport.request(Transport::OP_ENQUEUE, payload) + results = Codec.decode_enqueue_response(resp) + + items_only.each_with_index do |item, i| + item.result_queue.push(result_to_outcome(results[i])) + end end - rescue GRPC::BadStatus => e - broadcast_error(items, RPCError.new(e.code, e.details)) + rescue Transport::ConnectionClosed => e + broadcast_error(items, RPCError.new(0, "connection closed: #{e.message}")) rescue StandardError => e broadcast_error(items, Fila::Error.new(e.message)) end - # Convert a single proto EnqueueResult into a String (message_id) or Exception. + # Convert an EnqueueResult into a String (message_id) or Exception. def result_to_outcome(result) return Fila::Error.new('no result from server') if result.nil? - return result.message_id if result.result == :message_id - - err = result.error - case err.code - when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND - QueueNotFoundError.new("enqueue: #{err.message}") - else - RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message) - end + return result.message_id if result.success? + + QueueNotFoundError.new("enqueue: #{result.error}") end def broadcast_error(items, err) - items.each { |item| item.result_queue.push(err) } + items.each { |item| item.result_queue.push(err) rescue nil } # rubocop:disable Style/RescueModifier end def current_time_ms @@ -173,7 +173,7 @@ def pop_with_timeout(timeout_ms) rescue ThreadError raise if current_time_ms >= deadline - sleep(0.001) # 1ms polling interval + sleep(0.001) end end end diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 53d6cff..e504393 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -1,20 +1,14 @@ # frozen_string_literal: true -require 'grpc' - -# Add proto directory to load path so generated requires resolve correctly. -$LOAD_PATH.unshift(File.expand_path('proto', __dir__)) unless $LOAD_PATH.include?(File.expand_path('proto', __dir__)) - -require_relative 'proto/fila/v1/service_services_pb' require_relative 'errors' require_relative 'consume_message' require_relative 'enqueue_result' require_relative 'batcher' +require_relative 'transport' +require_relative 'codec' module Fila - # Client for the Fila message broker. - # - # Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. + # Client for the Fila message broker over the FIBP (Fila Binary Protocol). # # @example Plain-text, default auto-batching # client = Fila::Client.new("localhost:5555") @@ -57,9 +51,15 @@ def initialize( # rubocop:disable Metrics/ParameterLists batch_size: 100, linger_ms: 10 ) validate_batch_mode(batch_mode) - @api_key = api_key - @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) - @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) + validate_tls_options(tls || ca_cert, client_cert, client_key) + + host, port = parse_addr(addr) + @transport = Transport.new( + host: host, port: port, + tls: tls, ca_cert: ca_cert, + client_cert: client_cert, client_key: client_key, + api_key: api_key + ) @batcher = start_batcher(batch_mode, max_batch_size, batch_size, linger_ms) end @@ -67,6 +67,8 @@ def initialize( # rubocop:disable Metrics/ParameterLists def close @batcher&.close @batcher = nil + @transport&.close + @transport = nil end # Enqueue a single message to a queue. @@ -80,13 +82,9 @@ def close # @param headers [Hash, nil] optional headers # @return [String] broker-assigned message ID # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def enqueue(queue:, payload:, headers: nil) - msg = ::Fila::V1::EnqueueMessage.new( - queue: queue, - headers: headers || {}, - payload: payload - ) + msg = { queue: queue, payload: payload, headers: headers || {} } if @batcher @batcher.submit(msg) @@ -108,36 +106,29 @@ def enqueue(queue:, payload:, headers: nil) # keys :queue (String), :payload (String), and optionally # :headers (Hash) # @return [Array] - # @raise [RPCError] for transport-level gRPC failures + # @raise [RPCError] for transport-level failures def enqueue_many(messages) - proto_messages = messages.map do |m| - ::Fila::V1::EnqueueMessage.new( - queue: m[:queue], - headers: m[:headers] || {}, - payload: m[:payload] - ) - end - - req = ::Fila::V1::EnqueueRequest.new(messages: proto_messages) - resp = @stub.enqueue(req, metadata: call_metadata) - - resp.results.map do |r| - if r.result == :message_id - EnqueueResult.new(message_id: r.message_id) - else - EnqueueResult.new(error: r.error.message) - end - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + return [] if messages.empty? + + # Group messages by queue for the wire format (all in one frame, + # queue name is per-message in FIBP). + # The protocol supports a single queue per frame, so we use the + # queue of the first message and encode the rest individually by + # sending as a batch under each unique queue. + enqueue_many_raw(messages) + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end # Open a streaming consumer. Yields messages as they arrive. # Returns an Enumerator if no block given. + # + # @param queue [String] queue to consume + # @yield [ConsumeMessage] def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block - consume_with_redirect(queue: queue, redirected: false, &block) + consume_stream(queue, &block) end # Acknowledge a successfully processed message. @@ -145,25 +136,19 @@ def consume(queue:, &block) # @param queue [String] queue the message belongs to # @param msg_id [String] ID of the message to acknowledge # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def ack(queue:, msg_id:) - msg = ::Fila::V1::AckMessage.new(queue: queue, message_id: msg_id) - req = ::Fila::V1::AckRequest.new(messages: [msg]) - resp = @stub.ack(req, metadata: call_metadata) - - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? - return if result.result == :success - - err = result.error - case err.code - when :ACK_ERROR_CODE_MESSAGE_NOT_FOUND - raise MessageNotFoundError, "ack: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "ack: #{err.message}") - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + payload = Codec.encode_ack([{ queue: queue, msg_id: msg_id }]) + resp = @transport.request(Transport::OP_ACK, payload) + results = Codec.decode_ack_response(resp) + + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? + return if result[:ok] + + raise_ack_nack_error(result, 'ack') + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end # Negatively acknowledge a message that failed processing. @@ -172,45 +157,51 @@ def ack(queue:, msg_id:) # @param msg_id [String] ID of the message to nack # @param error [String] description of the failure # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def nack(queue:, msg_id:, error:) - msg = ::Fila::V1::NackMessage.new(queue: queue, message_id: msg_id, error: error) - req = ::Fila::V1::NackRequest.new(messages: [msg]) - resp = @stub.nack(req, metadata: call_metadata) - - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? - return if result.result == :success - - err = result.error - case err.code - when :NACK_ERROR_CODE_MESSAGE_NOT_FOUND - raise MessageNotFoundError, "nack: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "nack: #{err.message}") - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) - end + payload = Codec.encode_nack([{ queue: queue, msg_id: msg_id, error: error }]) + resp = @transport.request(Transport::OP_NACK, payload) + results = Codec.decode_nack_response(resp) - LEADER_ADDR_KEY = 'x-fila-leader-addr' + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? + return if result[:ok] - private_constant :LEADER_ADDR_KEY + raise_ack_nack_error(result, 'nack') + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") + end private + def parse_addr(addr) + # Support "host:port" with IPv6 like "[::1]:5555" + if addr =~ /\A\[(.+)\]:(\d+)\z/ + [$1, $2.to_i] + elsif addr =~ /\A(.+):(\d+)\z/ + [$1, $2.to_i] + else + raise ArgumentError, "invalid address #{addr.inspect}, expected host:port" + end + end + def validate_batch_mode(mode) return if BATCH_MODES.include?(mode) raise ArgumentError, "invalid batch_mode: #{mode.inspect}, must be one of #{BATCH_MODES.inspect}" end + def validate_tls_options(tls_enabled, client_cert, client_key) + return if tls_enabled || (!client_cert && !client_key) + + raise ArgumentError, 'tls: true or ca_cert is required when client_cert or client_key is provided' + end + def start_batcher(mode, max_batch_size, batch_size, linger_ms) return nil if mode == :disabled Batcher.new( - stub: @stub, - metadata: call_metadata, + transport: @transport, mode: mode, max_batch_size: max_batch_size, batch_size: batch_size, @@ -218,98 +209,67 @@ def start_batcher(mode, max_batch_size, batch_size, linger_ms) ) end - # Send a single message via the unified Enqueue RPC. + # Send a single message as a batch of one. def enqueue_single(msg) - req = ::Fila::V1::EnqueueRequest.new(messages: [msg]) - resp = @stub.enqueue(req, metadata: call_metadata) - - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? + results = enqueue_many_raw([msg]) + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? - if result.result == :message_id + if result.success? result.message_id else - err = result.error - case err.code - when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND - raise QueueNotFoundError, "enqueue: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "enqueue: #{err.message}") - end + raise QueueNotFoundError, "enqueue: #{result.error}" end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end - def consume_with_redirect(queue:, redirected:, &block) - stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata) - stream.each do |resp| - yield_messages_from_response(resp, &block) + # Raw multi-message enqueue — groups by queue, sends one frame per queue. + def enqueue_many_raw(messages) + # Group by queue to produce per-queue frames + groups = messages.each_with_index.group_by { |m, _| m[:queue] } + # Collect results in original order + all_results = Array.new(messages.size) + + groups.each do |queue, indexed_msgs| + msgs_only = indexed_msgs.map(&:first) + indices = indexed_msgs.map(&:last) + payload = Codec.encode_enqueue(queue, msgs_only) + resp = @transport.request(Transport::OP_ENQUEUE, payload) + results = Codec.decode_enqueue_response(resp) + + indices.each_with_index { |orig_idx, i| all_results[orig_idx] = results[i] } end - rescue GRPC::Cancelled then nil - rescue GRPC::NotFound => e - raise QueueNotFoundError, "consume: #{e.details}" - rescue GRPC::Unavailable => e - raise RPCError.new(e.code, e.details) if (leader_addr = extract_leader_addr(e)).nil? || redirected - - @stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials) - consume_with_redirect(queue: queue, redirected: true, &block) - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) - end - - # Unpack messages from a ConsumeResponse. - def yield_messages_from_response(resp, &block) - resp.messages.each do |msg| - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - end - - def extract_leader_addr(err) - err.metadata[LEADER_ADDR_KEY] - rescue StandardError - nil - end - - def build_credentials(tls:, ca_cert:, client_cert:, client_key:) - tls_enabled = tls || ca_cert - validate_tls_options(tls_enabled, client_cert, client_key) - return :this_channel_is_insecure unless tls_enabled - build_channel_credentials(ca_cert, client_cert, client_key) + all_results end - def validate_tls_options(tls_enabled, client_cert, client_key) - return if tls_enabled || (!client_cert && !client_key) - - raise ArgumentError, 'tls: true or ca_cert is required when client_cert or client_key is provided' - end - - def build_channel_credentials(ca_cert, client_cert, client_key) - if ca_cert then GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - elsif client_cert && client_key then GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) - else GRPC::Core::ChannelCredentials.new + def consume_stream(queue, &block) + push_q = Queue.new + payload = Codec.encode_consume(queue) + corr_id = @transport.start_consume(payload, push_q) + + loop do + frame = push_q.pop + case frame + when Transport::ConnectionClosed then break + when Exception then raise frame + when String + msg = Codec.decode_consume_push(frame) + block.call(msg) if msg + end end + ensure + @transport.stop_consume(corr_id) if corr_id end - def call_metadata - return {} unless @api_key - - { 'authorization' => "Bearer #{@api_key}" } - end - - def build_consume_message(msg) - metadata = msg.metadata - ConsumeMessage.new( - id: msg.id, - headers: msg.headers.to_h, - payload: msg.payload, - fairness_key: metadata&.fairness_key.to_s, - attempt_count: metadata&.attempt_count.to_i, - queue: metadata&.queue_id.to_s - ) + def raise_ack_nack_error(result, op) + case result[:err_code] + when Transport::ERR_MESSAGE_NOT_FOUND + raise MessageNotFoundError, "#{op}: #{result[:err_msg]}" + else + raise RPCError.new(result[:err_code], "#{op}: #{result[:err_msg]}") + end end end end diff --git a/lib/fila/codec.rb b/lib/fila/codec.rb new file mode 100644 index 0000000..0b3bafd --- /dev/null +++ b/lib/fila/codec.rb @@ -0,0 +1,221 @@ +# frozen_string_literal: true + +module Fila + # Encoding and decoding helpers for the FIBP binary wire format. + # + # All strings are UTF-8; all lengths and integers are big-endian. + # + # @api private + module Codec + module_function + + # ----------------------------------------------------------------------- + # Enqueue request + # + # queue_len:u16BE | queue:utf8 + # msg_count:u16BE + # messages... (each: header_count:u8 | + # headers: (key_len:u16BE+key, val_len:u16BE+val)* | + # payload_len:u32BE | payload) + # ----------------------------------------------------------------------- + + # @param queue [String] + # @param messages [Array] each has :payload and optional :headers + # @return [String] binary payload + def encode_enqueue(queue, messages) + queue_b = queue.encode('UTF-8').b + buf = [queue_b.bytesize].pack('n') + queue_b + buf += [messages.size].pack('n') + messages.each { |m| buf += encode_message(m) } + buf + end + + # @param payload [String] raw binary response payload + # @return [Array] + def decode_enqueue_response(payload) + pos = 0 + count, pos = read_u16(payload, pos) + results = [] + count.times do + ok, pos = read_u8(payload, pos) + if ok == 1 + id_len, pos = read_u16(payload, pos) + msg_id = payload.byteslice(pos, id_len).force_encoding('UTF-8') + pos += id_len + results << EnqueueResult.new(message_id: msg_id) + else + _err_code, pos = read_u16(payload, pos) + err_len, pos = read_u16(payload, pos) + err_msg = payload.byteslice(pos, err_len).force_encoding('UTF-8') + pos += err_len + results << EnqueueResult.new(error: err_msg) + end + end + results + end + + # ----------------------------------------------------------------------- + # Consume request + # + # queue_len:u16BE | queue:utf8 | initial_credits:u32BE + # ----------------------------------------------------------------------- + + # @param queue [String] + # @param initial_credits [Integer] + # @return [String] binary payload + def encode_consume(queue, initial_credits: 256) + queue_b = queue.encode('UTF-8').b + [queue_b.bytesize].pack('n') + queue_b + [initial_credits].pack('N') + end + + # Decode a server-push consume message frame payload. + # + # The server pushes individual messages (flags bit 2 set). + # Each push frame has the same layout as a single message entry + # preceded by a 1-element count (i.e., msg_count:u16 = 1, then the message). + # + # @param payload [String] raw binary frame payload + # @return [ConsumeMessage, nil] + def decode_consume_push(payload) + pos = 0 + + # msg_id + id_len, pos = read_u16(payload, pos) + msg_id = payload.byteslice(pos, id_len).force_encoding('UTF-8') + pos += id_len + + # fairness_key + fk_len, pos = read_u16(payload, pos) + fairness_key = payload.byteslice(pos, fk_len).force_encoding('UTF-8') + pos += fk_len + + # attempt_count + attempt_count, pos = read_u32(payload, pos) + + # queue_id + qid_len, pos = read_u16(payload, pos) + queue_id = payload.byteslice(pos, qid_len).force_encoding('UTF-8') + pos += qid_len + + # headers + header_count, pos = read_u8(payload, pos) + headers = {} + header_count.times do + k_len, pos = read_u16(payload, pos) + k = payload.byteslice(pos, k_len).force_encoding('UTF-8') + pos += k_len + v_len, pos = read_u16(payload, pos) + v = payload.byteslice(pos, v_len).force_encoding('UTF-8') + pos += v_len + headers[k] = v + end + + # payload + pay_len, pos = read_u32(payload, pos) + body = payload.byteslice(pos, pay_len) + + ConsumeMessage.new( + id: msg_id, + headers: headers, + payload: body, + fairness_key: fairness_key, + attempt_count: attempt_count, + queue: queue_id + ) + end + + # ----------------------------------------------------------------------- + # Ack request + # + # item_count:u16BE + # items...: queue_len:u16BE+queue + msg_id_len:u16BE+msg_id + # ----------------------------------------------------------------------- + + # @param items [Array] each has :queue and :msg_id + # @return [String] binary payload + def encode_ack(items) + buf = [items.size].pack('n') + items.each do |item| + buf += encode_str16(item[:queue]) + buf += encode_str16(item[:msg_id]) + end + buf + end + + # Decode an ack response. + # Response: item_count:u16 | items...: ok:u8 | if !ok: err_code:u16+err_len:u16+err_msg + # + # @param payload [String] + # @return [Array] each has :ok and optional :err_code, :err_msg + def decode_ack_response(payload) + pos = 0 + count, pos = read_u16(payload, pos) + results = [] + count.times do + ok, pos = read_u8(payload, pos) + if ok == 1 + results << { ok: true } + else + err_code, pos = read_u16(payload, pos) + err_len, pos = read_u16(payload, pos) + err_msg = payload.byteslice(pos, err_len).force_encoding('UTF-8') + pos += err_len + results << { ok: false, err_code: err_code, err_msg: err_msg } + end + end + results + end + + # ----------------------------------------------------------------------- + # Nack request + # + # Same as Ack but each item also has: err_len:u16BE+err_msg + # ----------------------------------------------------------------------- + + # @param items [Array] each has :queue, :msg_id, :error + # @return [String] binary payload + def encode_nack(items) + buf = [items.size].pack('n') + items.each do |item| + buf += encode_str16(item[:queue]) + buf += encode_str16(item[:msg_id]) + buf += encode_str16(item[:error].to_s) + end + buf + end + + # Decode a nack response (same shape as ack response). + alias decode_nack_response decode_ack_response + + private + + def encode_message(msg) + headers = msg[:headers] || {} + buf = [headers.size].pack('C') + headers.each do |k, v| + buf += encode_str16(k.to_s) + buf += encode_str16(v.to_s) + end + payload_b = (msg[:payload] || '').b + buf += [payload_b.bytesize].pack('N') + payload_b + buf + end + + def encode_str16(str) + b = str.encode('UTF-8').b + [b.bytesize].pack('n') + b + end + + def read_u8(buf, pos) + [buf.getbyte(pos), pos + 1] + end + + def read_u16(buf, pos) + [buf.byteslice(pos, 2).unpack1('n'), pos + 2] + end + + def read_u32(buf, pos) + [buf.byteslice(pos, 4).unpack1('N'), pos + 4] + end + end +end diff --git a/lib/fila/errors.rb b/lib/fila/errors.rb index 2f3dcb8..13dcef4 100644 --- a/lib/fila/errors.rb +++ b/lib/fila/errors.rb @@ -10,12 +10,12 @@ class QueueNotFoundError < Error; end # Raised when the specified message does not exist. class MessageNotFoundError < Error; end - # Raised for unexpected gRPC failures, preserving status code and message. + # Raised for unexpected transport failures, preserving an error code and message. class RPCError < Error - # @return [Integer] gRPC status code + # @return [Integer] error code (FIBP error code or 0 for connection errors) attr_reader :code - # @param code [Integer] gRPC status code + # @param code [Integer] error code # @param message [String] error message def initialize(code, message) @code = code diff --git a/lib/fila/proto/fila/v1/admin_pb.rb b/lib/fila/proto/fila/v1/admin_pb.rb deleted file mode 100644 index 5138c83..0000000 --- a/lib/fila/proto/fila/v1/admin_pb.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/admin.proto - -require 'google/protobuf' - - -descriptor_data = "\n\x13\x66ila/v1/admin.proto\x12\x07\x66ila.v1\"H\n\x12\x43reateQueueRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12$\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x14.fila.v1.QueueConfig\"b\n\x0bQueueConfig\x12\x19\n\x11on_enqueue_script\x18\x01 \x01(\t\x12\x19\n\x11on_failure_script\x18\x02 \x01(\t\x12\x1d\n\x15visibility_timeout_ms\x18\x03 \x01(\x04\"\'\n\x13\x43reateQueueResponse\x12\x10\n\x08queue_id\x18\x01 \x01(\t\"#\n\x12\x44\x65leteQueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"\x15\n\x13\x44\x65leteQueueResponse\".\n\x10SetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x13\n\x11SetConfigResponse\"\x1f\n\x10GetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\"\n\x11GetConfigResponse\x12\r\n\x05value\x18\x01 \x01(\t\")\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"#\n\x11ListConfigRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\t\"P\n\x12ListConfigResponse\x12%\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x14.fila.v1.ConfigEntry\x12\x13\n\x0btotal_count\x18\x02 \x01(\r\" \n\x0fGetStatsRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"b\n\x13PerFairnessKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x15\n\rpending_count\x18\x02 \x01(\x04\x12\x17\n\x0f\x63urrent_deficit\x18\x03 \x01(\x03\x12\x0e\n\x06weight\x18\x04 \x01(\r\"Z\n\x13PerThrottleKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0e\n\x06tokens\x18\x02 \x01(\x01\x12\x17\n\x0frate_per_second\x18\x03 \x01(\x01\x12\r\n\x05\x62urst\x18\x04 \x01(\x01\"\x9f\x02\n\x10GetStatsResponse\x12\r\n\x05\x64\x65pth\x18\x01 \x01(\x04\x12\x11\n\tin_flight\x18\x02 \x01(\x04\x12\x1c\n\x14\x61\x63tive_fairness_keys\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x0f\n\x07quantum\x18\x05 \x01(\r\x12\x33\n\rper_key_stats\x18\x06 \x03(\x0b\x32\x1c.fila.v1.PerFairnessKeyStats\x12\x38\n\x12per_throttle_stats\x18\x07 \x03(\x0b\x32\x1c.fila.v1.PerThrottleKeyStats\x12\x16\n\x0eleader_node_id\x18\x08 \x01(\x04\x12\x19\n\x11replication_count\x18\t \x01(\r\"2\n\x0eRedriveRequest\x12\x11\n\tdlq_queue\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x04\"#\n\x0fRedriveResponse\x12\x10\n\x08redriven\x18\x01 \x01(\x04\"\x13\n\x11ListQueuesRequest\"m\n\tQueueInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x02 \x01(\x04\x12\x11\n\tin_flight\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x16\n\x0eleader_node_id\x18\x05 \x01(\x04\"T\n\x12ListQueuesResponse\x12\"\n\x06queues\x18\x01 \x03(\x0b\x32\x12.fila.v1.QueueInfo\x12\x1a\n\x12\x63luster_node_count\x18\x02 \x01(\r\"Q\n\x13\x43reateApiKeyRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rexpires_at_ms\x18\x02 \x01(\x04\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"J\n\x14\x43reateApiKeyResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"%\n\x13RevokeApiKeyRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"\x16\n\x14RevokeApiKeyResponse\"\x14\n\x12ListApiKeysRequest\"o\n\nApiKeyInfo\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x15\n\rcreated_at_ms\x18\x03 \x01(\x04\x12\x15\n\rexpires_at_ms\x18\x04 \x01(\x04\x12\x15\n\ris_superadmin\x18\x05 \x01(\x08\"8\n\x13ListApiKeysResponse\x12!\n\x04keys\x18\x01 \x03(\x0b\x32\x13.fila.v1.ApiKeyInfo\".\n\rAclPermission\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0f\n\x07pattern\x18\x02 \x01(\t\"L\n\rSetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\"\x10\n\x0eSetAclResponse\"\x1f\n\rGetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"d\n\x0eGetAclResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\x32\x8e\x07\n\tFilaAdmin\x12H\n\x0b\x43reateQueue\x12\x1b.fila.v1.CreateQueueRequest\x1a\x1c.fila.v1.CreateQueueResponse\x12H\n\x0b\x44\x65leteQueue\x12\x1b.fila.v1.DeleteQueueRequest\x1a\x1c.fila.v1.DeleteQueueResponse\x12\x42\n\tSetConfig\x12\x19.fila.v1.SetConfigRequest\x1a\x1a.fila.v1.SetConfigResponse\x12\x42\n\tGetConfig\x12\x19.fila.v1.GetConfigRequest\x1a\x1a.fila.v1.GetConfigResponse\x12\x45\n\nListConfig\x12\x1a.fila.v1.ListConfigRequest\x1a\x1b.fila.v1.ListConfigResponse\x12?\n\x08GetStats\x12\x18.fila.v1.GetStatsRequest\x1a\x19.fila.v1.GetStatsResponse\x12<\n\x07Redrive\x12\x17.fila.v1.RedriveRequest\x1a\x18.fila.v1.RedriveResponse\x12\x45\n\nListQueues\x12\x1a.fila.v1.ListQueuesRequest\x1a\x1b.fila.v1.ListQueuesResponse\x12K\n\x0c\x43reateApiKey\x12\x1c.fila.v1.CreateApiKeyRequest\x1a\x1d.fila.v1.CreateApiKeyResponse\x12K\n\x0cRevokeApiKey\x12\x1c.fila.v1.RevokeApiKeyRequest\x1a\x1d.fila.v1.RevokeApiKeyResponse\x12H\n\x0bListApiKeys\x12\x1b.fila.v1.ListApiKeysRequest\x1a\x1c.fila.v1.ListApiKeysResponse\x12\x39\n\x06SetAcl\x12\x16.fila.v1.SetAclRequest\x1a\x17.fila.v1.SetAclResponse\x12\x39\n\x06GetAcl\x12\x16.fila.v1.GetAclRequest\x1a\x17.fila.v1.GetAclResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - CreateQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueRequest").msgclass - QueueConfig = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueConfig").msgclass - CreateQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueResponse").msgclass - DeleteQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueRequest").msgclass - DeleteQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueResponse").msgclass - SetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigRequest").msgclass - SetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigResponse").msgclass - GetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigRequest").msgclass - GetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigResponse").msgclass - ConfigEntry = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConfigEntry").msgclass - ListConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigRequest").msgclass - ListConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigResponse").msgclass - GetStatsRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsRequest").msgclass - PerFairnessKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerFairnessKeyStats").msgclass - PerThrottleKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerThrottleKeyStats").msgclass - GetStatsResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsResponse").msgclass - RedriveRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveRequest").msgclass - RedriveResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveResponse").msgclass - ListQueuesRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesRequest").msgclass - QueueInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueInfo").msgclass - ListQueuesResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesResponse").msgclass - CreateApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyRequest").msgclass - CreateApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyResponse").msgclass - RevokeApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyRequest").msgclass - RevokeApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyResponse").msgclass - ListApiKeysRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysRequest").msgclass - ApiKeyInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ApiKeyInfo").msgclass - ListApiKeysResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysResponse").msgclass - AclPermission = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AclPermission").msgclass - SetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclRequest").msgclass - SetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclResponse").msgclass - GetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclRequest").msgclass - GetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclResponse").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/admin_services_pb.rb b/lib/fila/proto/fila/v1/admin_services_pb.rb deleted file mode 100644 index 71de19e..0000000 --- a/lib/fila/proto/fila/v1/admin_services_pb.rb +++ /dev/null @@ -1,39 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/admin.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/admin_pb' - -module Fila - module V1 - module FilaAdmin - # Admin RPCs for operators and the CLI. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaAdmin' - - rpc :CreateQueue, ::Fila::V1::CreateQueueRequest, ::Fila::V1::CreateQueueResponse - rpc :DeleteQueue, ::Fila::V1::DeleteQueueRequest, ::Fila::V1::DeleteQueueResponse - rpc :SetConfig, ::Fila::V1::SetConfigRequest, ::Fila::V1::SetConfigResponse - rpc :GetConfig, ::Fila::V1::GetConfigRequest, ::Fila::V1::GetConfigResponse - rpc :ListConfig, ::Fila::V1::ListConfigRequest, ::Fila::V1::ListConfigResponse - rpc :GetStats, ::Fila::V1::GetStatsRequest, ::Fila::V1::GetStatsResponse - rpc :Redrive, ::Fila::V1::RedriveRequest, ::Fila::V1::RedriveResponse - rpc :ListQueues, ::Fila::V1::ListQueuesRequest, ::Fila::V1::ListQueuesResponse - # API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc :CreateApiKey, ::Fila::V1::CreateApiKeyRequest, ::Fila::V1::CreateApiKeyResponse - rpc :RevokeApiKey, ::Fila::V1::RevokeApiKeyRequest, ::Fila::V1::RevokeApiKeyResponse - rpc :ListApiKeys, ::Fila::V1::ListApiKeysRequest, ::Fila::V1::ListApiKeysResponse - # Per-key ACL management. - rpc :SetAcl, ::Fila::V1::SetAclRequest, ::Fila::V1::SetAclResponse - rpc :GetAcl, ::Fila::V1::GetAclRequest, ::Fila::V1::GetAclResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/proto/fila/v1/messages_pb.rb b/lib/fila/proto/fila/v1/messages_pb.rb deleted file mode 100644 index ae3674f..0000000 --- a/lib/fila/proto/fila/v1/messages_pb.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/messages.proto - -require 'google/protobuf' - -require 'google/protobuf/timestamp_pb' - - -descriptor_data = "\n\x16\x66ila/v1/messages.proto\x12\x07\x66ila.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe2\x01\n\x07Message\x12\n\n\x02id\x18\x01 \x01(\t\x12.\n\x07headers\x18\x02 \x03(\x0b\x32\x1d.fila.v1.Message.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x12*\n\x08metadata\x18\x04 \x01(\x0b\x32\x18.fila.v1.MessageMetadata\x12.\n\ntimestamps\x18\x05 \x01(\x0b\x32\x1a.fila.v1.MessageTimestamps\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"w\n\x0fMessageMetadata\x12\x14\n\x0c\x66\x61irness_key\x18\x01 \x01(\t\x12\x0e\n\x06weight\x18\x02 \x01(\r\x12\x15\n\rthrottle_keys\x18\x03 \x03(\t\x12\x15\n\rattempt_count\x18\x04 \x01(\r\x12\x10\n\x08queue_id\x18\x05 \x01(\t\"s\n\x11MessageTimestamps\x12/\n\x0b\x65nqueued_at\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\tleased_at\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - Message = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.Message").msgclass - MessageMetadata = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageMetadata").msgclass - MessageTimestamps = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageTimestamps").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/service_pb.rb b/lib/fila/proto/fila/v1/service_pb.rb deleted file mode 100644 index 9751f67..0000000 --- a/lib/fila/proto/fila/v1/service_pb.rb +++ /dev/null @@ -1,42 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/service.proto - -require 'google/protobuf' - -require 'fila/v1/messages_pb' - - -descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueMessage.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\";\n\x0e\x45nqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\"W\n\rEnqueueResult\x12\x14\n\nmessage_id\x18\x01 \x01(\tH\x00\x12&\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x15.fila.v1.EnqueueErrorH\x00\x42\x08\n\x06result\"H\n\x0c\x45nqueueError\x12\'\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x19.fila.v1.EnqueueErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\":\n\x0f\x45nqueueResponse\x12\'\n\x07results\x18\x01 \x03(\x0b\x32\x16.fila.v1.EnqueueResult\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"5\n\x0f\x43onsumeResponse\x12\"\n\x08messages\x18\x01 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"3\n\nAckRequest\x12%\n\x08messages\x18\x01 \x03(\x0b\x32\x13.fila.v1.AckMessage\"a\n\tAckResult\x12&\n\x07success\x18\x01 \x01(\x0b\x32\x13.fila.v1.AckSuccessH\x00\x12\"\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x11.fila.v1.AckErrorH\x00\x42\x08\n\x06result\"\x0c\n\nAckSuccess\"@\n\x08\x41\x63kError\x12#\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x15.fila.v1.AckErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"2\n\x0b\x41\x63kResponse\x12#\n\x07results\x18\x01 \x03(\x0b\x32\x12.fila.v1.AckResult\"?\n\x0bNackMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"5\n\x0bNackRequest\x12&\n\x08messages\x18\x01 \x03(\x0b\x32\x14.fila.v1.NackMessage\"d\n\nNackResult\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x14.fila.v1.NackSuccessH\x00\x12#\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x12.fila.v1.NackErrorH\x00\x42\x08\n\x06result\"\r\n\x0bNackSuccess\"B\n\tNackError\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.fila.v1.NackErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"4\n\x0cNackResponse\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.fila.v1.NackResult\"Z\n\x14StreamEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\x12\x17\n\x0fsequence_number\x18\x02 \x01(\x04\"Y\n\x15StreamEnqueueResponse\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x04\x12\'\n\x07results\x18\x02 \x03(\x0b\x32\x16.fila.v1.EnqueueResult*\xc4\x01\n\x10\x45nqueueErrorCode\x12\"\n\x1e\x45NQUEUE_ERROR_CODE_UNSPECIFIED\x10\x00\x12&\n\"ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND\x10\x01\x12\x1e\n\x1a\x45NQUEUE_ERROR_CODE_STORAGE\x10\x02\x12\x1a\n\x16\x45NQUEUE_ERROR_CODE_LUA\x10\x03\x12(\n$ENQUEUE_ERROR_CODE_PERMISSION_DENIED\x10\x04*\x96\x01\n\x0c\x41\x63kErrorCode\x12\x1e\n\x1a\x41\x43K_ERROR_CODE_UNSPECIFIED\x10\x00\x12$\n ACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1a\n\x16\x41\x43K_ERROR_CODE_STORAGE\x10\x02\x12$\n ACK_ERROR_CODE_PERMISSION_DENIED\x10\x03*\x9b\x01\n\rNackErrorCode\x12\x1f\n\x1bNACK_ERROR_CODE_UNSPECIFIED\x10\x00\x12%\n!NACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1b\n\x17NACK_ERROR_CODE_STORAGE\x10\x02\x12%\n!NACK_ERROR_CODE_PERMISSION_DENIED\x10\x03\x32\xc6\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12R\n\rStreamEnqueue\x12\x1d.fila.v1.StreamEnqueueRequest\x1a\x1e.fila.v1.StreamEnqueueResponse(\x01\x30\x01\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - EnqueueMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueMessage").msgclass - EnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueRequest").msgclass - EnqueueResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResult").msgclass - EnqueueError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueError").msgclass - EnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResponse").msgclass - ConsumeRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeRequest").msgclass - ConsumeResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeResponse").msgclass - AckMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckMessage").msgclass - AckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckRequest").msgclass - AckResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResult").msgclass - AckSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckSuccess").msgclass - AckError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckError").msgclass - AckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResponse").msgclass - NackMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackMessage").msgclass - NackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackRequest").msgclass - NackResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResult").msgclass - NackSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackSuccess").msgclass - NackError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackError").msgclass - NackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResponse").msgclass - StreamEnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueRequest").msgclass - StreamEnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueResponse").msgclass - EnqueueErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueErrorCode").enummodule - AckErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckErrorCode").enummodule - NackErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackErrorCode").enummodule - end -end diff --git a/lib/fila/proto/fila/v1/service_services_pb.rb b/lib/fila/proto/fila/v1/service_services_pb.rb deleted file mode 100644 index 93d38ab..0000000 --- a/lib/fila/proto/fila/v1/service_services_pb.rb +++ /dev/null @@ -1,29 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/service.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/service_pb' - -module Fila - module V1 - module FilaService - # Hot-path RPCs for producers and consumers. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaService' - - rpc :Enqueue, ::Fila::V1::EnqueueRequest, ::Fila::V1::EnqueueResponse - rpc :StreamEnqueue, stream(::Fila::V1::StreamEnqueueRequest), stream(::Fila::V1::StreamEnqueueResponse) - rpc :Consume, ::Fila::V1::ConsumeRequest, stream(::Fila::V1::ConsumeResponse) - rpc :Ack, ::Fila::V1::AckRequest, ::Fila::V1::AckResponse - rpc :Nack, ::Fila::V1::NackRequest, ::Fila::V1::NackResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/transport.rb b/lib/fila/transport.rb new file mode 100644 index 0000000..072754a --- /dev/null +++ b/lib/fila/transport.rb @@ -0,0 +1,278 @@ +# frozen_string_literal: true + +require 'socket' +require 'openssl' +require 'thread' + +module Fila + # Low-level FIBP (Fila Binary Protocol) transport. + # + # Manages a single TCP connection with: + # - Handshake on connect + # - Length-prefixed frame read/write + # - Correlation-ID-based request/response multiplexing + # - A reader thread that dispatches incoming frames + # - Optional TLS via OpenSSL::SSL + # - API key authentication via AUTH frame + # + # @api private + class Transport # rubocop:disable Metrics/ClassLength + HANDSHAKE = "FIBP\x01\x00".b.freeze + HEADER_SIZE = 6 # flags:u8 + op:u8 + corr_id:u32 + + # Op codes + OP_ENQUEUE = 0x01 + OP_CONSUME = 0x02 + OP_ACK = 0x03 + OP_NACK = 0x04 + OP_AUTH = 0x30 + OP_FLOW = 0x20 + OP_HEARTBEAT = 0x21 + OP_ERROR = 0xFE + OP_GOAWAY = 0xFF + + # Frame flags + FLAG_SERVER_PUSH = 0x04 # bit 2: server-push message frame + + # Error codes returned by the server inside ERROR frames + ERR_QUEUE_NOT_FOUND = 1 + ERR_MESSAGE_NOT_FOUND = 2 + ERR_UNAUTHENTICATED = 3 + + # Sentinel pushed to a pending queue when the connection closes. + ConnectionClosed = Class.new(StandardError) + + # @param host [String] + # @param port [Integer] + # @param tls [Boolean] + # @param ca_cert [String, nil] PEM CA cert + # @param client_cert [String, nil] PEM client cert (mTLS) + # @param client_key [String, nil] PEM client key (mTLS) + # @param api_key [String, nil] + def initialize(host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) + @host = host + @port = port + @tls = tls || ca_cert + @ca_cert = ca_cert + @client_cert = client_cert + @client_key = client_key + @api_key = api_key + + @mutex = Mutex.new + @corr_seq = 0 + @pending = {} # corr_id => Queue + @push_queue = nil # set during consume_stream + @closed = false + + @socket = connect_socket + perform_handshake + send_auth if @api_key + start_reader + end + + # Send a request frame and block until the response arrives. + # + # @param op [Integer] op code + # @param payload [String] binary payload (encoding: BINARY) + # @return [String] response payload + # @raise [Fila::QueueNotFoundError, Fila::MessageNotFoundError, Fila::RPCError, ConnectionClosed] + def request(op, payload) + corr_id = next_corr_id + result_q = Queue.new + + @mutex.synchronize do + raise ConnectionClosed, 'connection is closed' if @closed + + @pending[corr_id] = result_q + end + + write_frame(op, corr_id, payload) + + outcome = result_q.pop + case outcome + when String then outcome + when Exception then raise outcome + else raise RPCError.new(0, "unexpected transport result: #{outcome.inspect}") + end + end + + # Register a push queue for consume-stream server-push frames. + # Returns corr_id used to issue the consume request. + # + # @param payload [String] consume request payload + # @param push_q [Queue] messages pushed here as they arrive + # @return [Integer] corr_id + def start_consume(payload, push_q) + corr_id = next_corr_id + + @mutex.synchronize do + raise ConnectionClosed, 'connection is closed' if @closed + + @pending[corr_id] = push_q + end + + write_frame(OP_CONSUME, corr_id, payload) + corr_id + end + + # Remove the consume push queue and stop dispatching to it. + def stop_consume(corr_id) + @mutex.synchronize { @pending.delete(corr_id) } + end + + # Close the connection. + def close + @mutex.synchronize { @closed = true } + @socket.close rescue nil + @reader_thread&.join(2) + drain_pending + end + + private + + def connect_socket + raw = TCPSocket.new(@host, @port) + raw.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + return raw unless @tls + + ctx = OpenSSL::SSL::SSLContext.new + ctx.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER) + + if @ca_cert + store = OpenSSL::X509::Store.new + store.add_cert(OpenSSL::X509::Certificate.new(@ca_cert)) + ctx.cert_store = store + end + # else: use default system trust store + + if @client_cert && @client_key + ctx.cert = OpenSSL::X509::Certificate.new(@client_cert) + ctx.key = OpenSSL::PKey::RSA.new(@client_key) + end + + ssl = OpenSSL::SSL::SSLSocket.new(raw, ctx) + ssl.hostname = @host + ssl.connect + ssl + end + + def perform_handshake + write_raw(HANDSHAKE) + echo = read_raw(6) + raise RPCError.new(0, "FIBP handshake failed: got #{echo.inspect}") unless echo == HANDSHAKE + end + + def send_auth + key_bytes = @api_key.encode('UTF-8').b + payload = [key_bytes.bytesize].pack('n') + key_bytes + request(OP_AUTH, payload) + end + + def start_reader + @reader_thread = Thread.new { reader_loop } + @reader_thread.abort_on_exception = false + end + + def reader_loop + loop do + # Read 4-byte length prefix + len_bytes = read_raw(4) + break unless len_bytes + + total_len = len_bytes.unpack1('N') + frame = read_raw(total_len) + break unless frame && frame.bytesize == total_len + + dispatch_frame(frame) + end + rescue IOError, Errno::ECONNRESET, Errno::EPIPE, OpenSSL::SSL::SSLError + # connection dropped + ensure + @mutex.synchronize { @closed = true } + drain_pending + end + + def dispatch_frame(frame) + flags = frame.getbyte(0) + op = frame.getbyte(1) + corr_id = frame.byteslice(2, 4).unpack1('N') + payload = frame.byteslice(HEADER_SIZE, frame.bytesize - HEADER_SIZE) || ''.b + + dest = @mutex.synchronize { @pending[corr_id] } + return unless dest + + if op == OP_ERROR + err = parse_error_frame(payload) + if flags & FLAG_SERVER_PUSH != 0 + dest.push(err) + else + @mutex.synchronize { @pending.delete(corr_id) } + dest.push(err) + end + elsif op == OP_GOAWAY + drain_pending + elsif flags & FLAG_SERVER_PUSH != 0 + # Server-push consume frame: leave @pending intact, push payload + dest.push(payload) + else + # Normal response: remove pending, push payload + @mutex.synchronize { @pending.delete(corr_id) } + dest.push(payload) + end + end + + def parse_error_frame(payload) + err_code = payload.byteslice(0, 2).unpack1('n') + msg_len = payload.byteslice(2, 2).unpack1('n') + msg = payload.byteslice(4, msg_len).force_encoding('UTF-8') + case err_code + when ERR_QUEUE_NOT_FOUND then QueueNotFoundError.new(msg) + when ERR_MESSAGE_NOT_FOUND then MessageNotFoundError.new(msg) + when ERR_UNAUTHENTICATED then RPCError.new(ERR_UNAUTHENTICATED, msg) + else RPCError.new(err_code, msg) + end + end + + def write_frame(op, corr_id, payload) + flags = 0 + header = [flags, op, corr_id].pack('CCN') + body = header + payload.b + write_raw([body.bytesize].pack('N') + body) + end + + def write_raw(bytes) + @mutex.synchronize do + @socket.write(bytes) + end + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, OpenSSL::SSL::SSLError => e + raise ConnectionClosed, "write failed: #{e.message}" + end + + def read_raw(n) + buf = ''.b + while buf.bytesize < n + chunk = @socket.read(n - buf.bytesize) + return nil if chunk.nil? || chunk.empty? + + buf << chunk + end + buf + end + + def next_corr_id + @mutex.synchronize do + @corr_seq = (@corr_seq + 1) & 0xFFFFFFFF + @corr_seq + end + end + + def drain_pending + err = ConnectionClosed.new('connection closed') + @mutex.synchronize do + @pending.each_value { |q| q.push(err) rescue nil } + @pending.clear + end + end + end +end diff --git a/lib/fila/version.rb b/lib/fila/version.rb index ecc3d06..efdd9b7 100644 --- a/lib/fila/version.rb +++ b/lib/fila/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Fila - VERSION = '0.4.0' + VERSION = '0.5.0' end diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto deleted file mode 100644 index 886e58d..0000000 --- a/proto/fila/v1/admin.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -// Admin RPCs for operators and the CLI. -service FilaAdmin { - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc SetConfig(SetConfigRequest) returns (SetConfigResponse); - rpc GetConfig(GetConfigRequest) returns (GetConfigResponse); - rpc ListConfig(ListConfigRequest) returns (ListConfigResponse); - rpc GetStats(GetStatsRequest) returns (GetStatsResponse); - rpc Redrive(RedriveRequest) returns (RedriveResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); -} - -message CreateQueueRequest { - string name = 1; - QueueConfig config = 2; -} - -message QueueConfig { - string on_enqueue_script = 1; - string on_failure_script = 2; - uint64 visibility_timeout_ms = 3; -} - -message CreateQueueResponse { - string queue_id = 1; -} - -message DeleteQueueRequest { - string queue = 1; -} - -message DeleteQueueResponse {} - -message SetConfigRequest { - string key = 1; - string value = 2; -} - -message SetConfigResponse {} - -message GetConfigRequest { - string key = 1; -} - -message GetConfigResponse { - string value = 1; -} - -message ConfigEntry { - string key = 1; - string value = 2; -} - -message ListConfigRequest { - string prefix = 1; -} - -message ListConfigResponse { - repeated ConfigEntry entries = 1; - uint32 total_count = 2; -} - -message GetStatsRequest { - string queue = 1; -} - -message PerFairnessKeyStats { - string key = 1; - uint64 pending_count = 2; - int64 current_deficit = 3; - uint32 weight = 4; -} - -message PerThrottleKeyStats { - string key = 1; - double tokens = 2; - double rate_per_second = 3; - double burst = 4; -} - -message GetStatsResponse { - uint64 depth = 1; - uint64 in_flight = 2; - uint64 active_fairness_keys = 3; - uint32 active_consumers = 4; - uint32 quantum = 5; - repeated PerFairnessKeyStats per_key_stats = 6; - repeated PerThrottleKeyStats per_throttle_stats = 7; - // Cluster fields (0 when not in cluster mode). - uint64 leader_node_id = 8; - uint32 replication_count = 9; -} - -message RedriveRequest { - string dlq_queue = 1; - uint64 count = 2; -} - -message RedriveResponse { - uint64 redriven = 1; -} - -message ListQueuesRequest {} - -message QueueInfo { - string name = 1; - uint64 depth = 2; - uint64 in_flight = 3; - uint32 active_consumers = 4; - uint64 leader_node_id = 5; -} - -message ListQueuesResponse { - repeated QueueInfo queues = 1; - uint32 cluster_node_count = 2; -} - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/messages.proto b/proto/fila/v1/messages.proto deleted file mode 100644 index a0709cf..0000000 --- a/proto/fila/v1/messages.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "google/protobuf/timestamp.proto"; - -// Core message envelope persisted in the broker. -message Message { - string id = 1; - map headers = 2; - bytes payload = 3; - MessageMetadata metadata = 4; - MessageTimestamps timestamps = 5; -} - -// Broker-assigned scheduling metadata. -message MessageMetadata { - string fairness_key = 1; - uint32 weight = 2; - repeated string throttle_keys = 3; - uint32 attempt_count = 4; - string queue_id = 5; -} - -// Lifecycle timestamps attached to every message. -message MessageTimestamps { - google.protobuf.Timestamp enqueued_at = 1; - google.protobuf.Timestamp leased_at = 2; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto deleted file mode 100644 index 7d1db79..0000000 --- a/proto/fila/v1/service.proto +++ /dev/null @@ -1,142 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "fila/v1/messages.proto"; - -// Hot-path RPCs for producers and consumers. -service FilaService { - rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); - rpc Ack(AckRequest) returns (AckResponse); - rpc Nack(NackRequest) returns (NackResponse); -} - -// Individual message to enqueue. -message EnqueueMessage { - string queue = 1; - map headers = 2; - bytes payload = 3; -} - -// Enqueue one or more messages. -message EnqueueRequest { - repeated EnqueueMessage messages = 1; -} - -// Per-message enqueue result. -message EnqueueResult { - oneof result { - string message_id = 1; - EnqueueError error = 2; - } -} - -// Typed enqueue error with structured error code. -message EnqueueError { - EnqueueErrorCode code = 1; - string message = 2; -} - -enum EnqueueErrorCode { - ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; - ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; - ENQUEUE_ERROR_CODE_STORAGE = 2; - ENQUEUE_ERROR_CODE_LUA = 3; - ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; -} - -// One result per input message. -message EnqueueResponse { - repeated EnqueueResult results = 1; -} - -message ConsumeRequest { - string queue = 1; -} - -message ConsumeResponse { - repeated Message messages = 1; -} - -// Individual ack item. -message AckMessage { - string queue = 1; - string message_id = 2; -} - -message AckRequest { - repeated AckMessage messages = 1; -} - -message AckResult { - oneof result { - AckSuccess success = 1; - AckError error = 2; - } -} - -message AckSuccess {} - -message AckError { - AckErrorCode code = 1; - string message = 2; -} - -enum AckErrorCode { - ACK_ERROR_CODE_UNSPECIFIED = 0; - ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - ACK_ERROR_CODE_STORAGE = 2; - ACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message AckResponse { - repeated AckResult results = 1; -} - -// Individual nack item. -message NackMessage { - string queue = 1; - string message_id = 2; - string error = 3; -} - -message NackRequest { - repeated NackMessage messages = 1; -} - -message NackResult { - oneof result { - NackSuccess success = 1; - NackError error = 2; - } -} - -message NackSuccess {} - -message NackError { - NackErrorCode code = 1; - string message = 2; -} - -enum NackErrorCode { - NACK_ERROR_CODE_UNSPECIFIED = 0; - NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - NACK_ERROR_CODE_STORAGE = 2; - NACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message NackResponse { - repeated NackResult results = 1; -} - -// Stream enqueue — per-write batch with sequence tracking. -message StreamEnqueueRequest { - repeated EnqueueMessage messages = 1; - uint64 sequence_number = 2; -} - -message StreamEnqueueResponse { - uint64 sequence_number = 1; - repeated EnqueueResult results = 2; -} diff --git a/test/test_batch.rb b/test/test_batch.rb index 62f1b41..295c6ce 100644 --- a/test/test_batch.rb +++ b/test/test_batch.rb @@ -222,15 +222,6 @@ def test_invalid_batch_mode_raises Fila::Client.new('localhost:5555', batch_mode: :invalid) end end - - def test_valid_batch_modes_accepted - # These should not raise (but won't connect since server isn't on this port). - # Just verify argument validation passes. - %i[auto linger disabled].each do |mode| - client = Fila::Client.new('localhost:19999', batch_mode: mode) - client.close - end - end end class TestCloseFlush < Minitest::Test diff --git a/test/test_helper.rb b/test/test_helper.rb index c88a349..01f14cd 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,12 +3,10 @@ require 'minitest/autorun' require 'tmpdir' require 'socket' -require 'grpc' +require 'openssl' $LOAD_PATH.unshift File.expand_path('../lib', __dir__) -$LOAD_PATH.unshift File.expand_path('../lib/fila/proto', __dir__) require 'fila' -require_relative '../lib/fila/proto/fila/v1/admin_services_pb' FILA_SERVER_BIN = ENV.fetch('FILA_SERVER_BIN') do File.join(__dir__, '..', '..', 'fila', 'target', 'release', 'fila-server') @@ -28,12 +26,13 @@ def self.find_free_port # @param tls_config [Hash, nil] optional TLS configuration with keys: # :ca_cert_path, :server_cert_path, :server_key_path # @param bootstrap_apikey [String, nil] optional bootstrap API key - # @return [Hash] server info with :addr, :pid, :data_dir, :admin_stub - def self.start(tls_config: nil, bootstrap_apikey: nil) + # @return [Hash] server info with :addr, :host, :port, :pid, :data_dir + # and optional :tls_config, :bootstrap_apikey + def self.start(tls_config: nil, bootstrap_apikey: nil) # rubocop:disable Metrics/MethodLength port = find_free_port addr = "127.0.0.1:#{port}" - data_dir = Dir.mktmpdir('fila-test-') + data_dir = Dir.mktmpdir('fila-test-') config_path = File.join(data_dir, 'fila.toml') toml = "[server]\nlisten_addr = \"#{addr}\"\n" @@ -62,28 +61,27 @@ def self.start(tls_config: nil, bootstrap_apikey: nil) ) end - # Build credentials for admin stub. - # client_ca_cert_path is always needed to verify server cert; ca_cert_path is only for mTLS. - credentials = :this_channel_is_insecure - if tls_config - ca_path = tls_config[:client_ca_cert_path] || tls_config[:ca_cert_path] - if ca_path - ca_cert = File.read(ca_path) - client_key = tls_config[:client_key_path] ? File.read(tls_config[:client_key_path]) : nil - client_cert = tls_config[:client_cert_path] ? File.read(tls_config[:client_cert_path]) : nil - credentials = GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - end - end + server_info = { + addr: addr, + host: '127.0.0.1', + port: port, + pid: pid, + data_dir: data_dir, + tls_config: tls_config, + bootstrap_apikey: bootstrap_apikey + } - admin_metadata = {} - admin_metadata['authorization'] = "Bearer #{bootstrap_apikey}" if bootstrap_apikey + wait_for_ready(server_info, stderr_path, toml) + server_info + end - # Wait for server ready. + def self.wait_for_ready(server_info, stderr_path, toml) deadline = Time.now + 10 ready = false while Time.now < deadline begin - try_list_queues(addr, credentials: credentials, metadata: admin_metadata) + transport = admin_transport(server_info) + transport.close ready = true break rescue StandardError @@ -91,27 +89,17 @@ def self.start(tls_config: nil, bootstrap_apikey: nil) end end - unless ready - Process.kill('TERM', pid) - Process.wait(pid) - stderr_output = begin - File.read(stderr_path) - rescue StandardError - '' - end - FileUtils.rm_rf(data_dir) - raise "fila-server failed to start within 10s on #{addr}\nConfig:\n#{toml}\nStderr:\n#{stderr_output}" - end - - admin_stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) + return if ready - { - addr: addr, - pid: pid, - data_dir: data_dir, - admin_stub: admin_stub, - admin_metadata: admin_metadata - } + Process.kill('TERM', server_info[:pid]) + Process.wait(server_info[:pid]) + stderr_output = begin + File.read(stderr_path) + rescue StandardError + '' + end + FileUtils.rm_rf(server_info[:data_dir]) + raise "fila-server failed to start within 10s on #{server_info[:addr]}\nConfig:\n#{toml}\nStderr:\n#{stderr_output}" end def self.stop(server) @@ -122,13 +110,41 @@ def self.stop(server) # Process already gone. end - def self.create_queue(server, name) - req = ::Fila::V1::CreateQueueRequest.new(name: name, config: {}) - server[:admin_stub].create_queue(req, metadata: server[:admin_metadata] || {}) + # Build a FIBP transport with appropriate TLS/auth for admin operations. + def self.admin_transport(server) + tc = server[:tls_config] + tls_opts = if tc + ca_path = tc[:client_ca_cert_path] || tc[:ca_cert_path] + { + tls: true, + ca_cert: ca_path ? File.read(ca_path) : nil, + client_cert: tc[:client_cert_path] ? File.read(tc[:client_cert_path]) : nil, + client_key: tc[:client_key_path] ? File.read(tc[:client_key_path]) : nil + } + else + { tls: false } + end + + Fila::Transport.new( + host: server[:host], + port: server[:port], + api_key: server[:bootstrap_apikey], + **tls_opts + ) end - def self.try_list_queues(addr, credentials: :this_channel_is_insecure, metadata: {}) - stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) - stub.list_queues(::Fila::V1::ListQueuesRequest.new, metadata: metadata) + # Send a CreateQueue admin frame via FIBP. + OP_CREATE_QUEUE = 0x10 + + def self.create_queue(server, name) + transport = admin_transport(server) + name_b = name.encode('UTF-8').b + payload = [name_b.bytesize].pack('n') + name_b + + [0].pack('n') # config_count: 0 key-value pairs + transport.request(OP_CREATE_QUEUE, payload) + rescue StandardError => e + raise "create_queue #{name.inspect} failed: #{e.message}" + ensure + transport&.close end end diff --git a/test/test_tls_auth.rb b/test/test_tls_auth.rb index 7aad862..a008387 100644 --- a/test/test_tls_auth.rb +++ b/test/test_tls_auth.rb @@ -79,6 +79,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end @@ -94,11 +95,13 @@ def test_enqueue_without_api_key_rejected client_no_key = Fila::Client.new(@server[:addr]) TestServerHelper.create_queue(@server, 'auth-reject-queue') - # Without API key, the server should reject the request with Unauthenticated. + # Without API key, the server should reject with Unauthenticated. err = assert_raises(Fila::RPCError) do client_no_key.enqueue(queue: 'auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # GRPC::Core::StatusCodes::UNAUTHENTICATED + assert_equal Fila::Transport::ERR_UNAUTHENTICATED, err.code + ensure + client_no_key&.close end def test_consume_with_api_key @@ -138,6 +141,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -189,6 +193,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -208,8 +213,6 @@ def setup @certs = CertHelper.generate_certs(@cert_dir) @bootstrap_key = 'tls-bootstrap-key-67890' - # Server-only TLS + API key: omit ca_cert_path so server does not require client certs. - # client_ca_cert_path is used by the test client to verify the server cert. @server = TestServerHelper.start( tls_config: { server_cert_path: @certs[:server_cert], @@ -227,6 +230,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -249,7 +253,9 @@ def test_no_api_key_over_tls_rejected err = assert_raises(Fila::RPCError) do client_no_key.enqueue(queue: 'tls-auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # UNAUTHENTICATED + assert_equal Fila::Transport::ERR_UNAUTHENTICATED, err.code + ensure + client_no_key&.close end end @@ -260,6 +266,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end From fc69b0ab532062e6409b0d165b416d2351793be1 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:23:03 -0300 Subject: [PATCH 2/5] fix: address rubocop lint findings from ci MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - remove redundant require 'thread' (stdlib auto-loaded) - fix extra spacing on constants - use class keyword instead of Class.new for ConnectionClosed - rename short param names (op → opcode, n → num_bytes, op → operation) - use rescue as block rescue (not modifier form) in drain_pending and batcher - use anybits? for bitflag check (Style/BitwisePredicate) - remove redundant begin blocks - remove unnecessary rubocop:disable directives - extract read_str16/read_headers helpers to reduce decode_consume_push complexity - fix $1/$2 perl backrefs to named match captures - rewrite parse_addr to avoid duplicate branch body - fix gemspec description string literals and line length --- fila-client.gemspec | 3 +- lib/fila/batcher.rb | 24 ++++++++------ lib/fila/client.rb | 28 +++++++--------- lib/fila/codec.rb | 76 +++++++++++++++++-------------------------- lib/fila/transport.rb | 74 ++++++++++++++++++++--------------------- test/test_helper.rb | 4 +-- 6 files changed, 96 insertions(+), 113 deletions(-) diff --git a/fila-client.gemspec b/fila-client.gemspec index 04e89d9..ca41930 100644 --- a/fila-client.gemspec +++ b/fila-client.gemspec @@ -7,7 +7,8 @@ Gem::Specification.new do |spec| spec.version = Fila::VERSION spec.authors = ['Faisca'] spec.summary = 'Ruby client SDK for the Fila message broker' - spec.description = "Idiomatic Ruby client for the Fila message broker using the FIBP (Fila Binary Protocol) transport. Supports enqueue, consume, ack, nack, TLS/mTLS, and API key auth." + spec.description = 'Ruby client for the Fila message broker using FIBP transport. ' \ + 'Supports enqueue, consume, ack, nack, TLS/mTLS, and API key auth.' spec.homepage = 'https://github.com/faiscadev/fila-ruby' spec.license = 'AGPL-3.0-or-later' spec.required_ruby_version = '>= 3.1' diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index 003acf2..f82d757 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -127,19 +127,19 @@ def drain_nonblocking(batch) # Flush a batch of items via the FIBP transport. # Groups items by queue to produce one frame per queue. - def flush_batch(items) # rubocop:disable Metrics/MethodLength + def flush_batch(items) # rubocop:disable Metrics/AbcSize # Group by queue, preserving per-item result queues groups = items.each_with_index.group_by { |item, _| item.message[:queue] } groups.each do |queue, indexed_items| - items_only = indexed_items.map(&:first) - msgs = items_only.map(&:message) - payload = Codec.encode_enqueue(queue, msgs) - resp = @transport.request(Transport::OP_ENQUEUE, payload) - results = Codec.decode_enqueue_response(resp) - - items_only.each_with_index do |item, i| - item.result_queue.push(result_to_outcome(results[i])) + items_only = indexed_items.map(&:first) + msgs = items_only.map(&:message) + payload = Codec.encode_enqueue(queue, msgs) + resp = @transport.request(Transport::OP_ENQUEUE, payload) + results = Codec.decode_enqueue_response(resp) + + items_only.each_with_index do |item, idx| + item.result_queue.push(result_to_outcome(results[idx])) end end rescue Transport::ConnectionClosed => e @@ -157,7 +157,11 @@ def result_to_outcome(result) end def broadcast_error(items, err) - items.each { |item| item.result_queue.push(err) rescue nil } # rubocop:disable Style/RescueModifier + items.each do |item| + item.result_queue.push(err) + rescue StandardError + nil + end end def current_time_ms diff --git a/lib/fila/client.rb b/lib/fila/client.rb index e504393..62ead56 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -175,14 +175,12 @@ def nack(queue:, msg_id:, error:) private def parse_addr(addr) - # Support "host:port" with IPv6 like "[::1]:5555" - if addr =~ /\A\[(.+)\]:(\d+)\z/ - [$1, $2.to_i] - elsif addr =~ /\A(.+):(\d+)\z/ - [$1, $2.to_i] - else - raise ArgumentError, "invalid address #{addr.inspect}, expected host:port" - end + # Support "host:port" and IPv6 "[::1]:5555" + pattern = addr.start_with?('[') ? /\A\[(.+)\]:(\d+)\z/ : /\A(.+):(\d+)\z/ + m = addr.match(pattern) + raise ArgumentError, "invalid address #{addr.inspect}, expected host:port" unless m + + [m[1], m[2].to_i] end def validate_batch_mode(mode) @@ -215,11 +213,9 @@ def enqueue_single(msg) result = results.first raise RPCError.new(0, 'no result from server') if result.nil? - if result.success? - result.message_id - else - raise QueueNotFoundError, "enqueue: #{result.error}" - end + raise QueueNotFoundError, "enqueue: #{result.error}" unless result.success? + + result.message_id rescue Transport::ConnectionClosed => e raise RPCError.new(0, "connection closed: #{e.message}") end @@ -263,12 +259,12 @@ def consume_stream(queue, &block) @transport.stop_consume(corr_id) if corr_id end - def raise_ack_nack_error(result, op) + def raise_ack_nack_error(result, operation) case result[:err_code] when Transport::ERR_MESSAGE_NOT_FOUND - raise MessageNotFoundError, "#{op}: #{result[:err_msg]}" + raise MessageNotFoundError, "#{operation}: #{result[:err_msg]}" else - raise RPCError.new(result[:err_code], "#{op}: #{result[:err_msg]}") + raise RPCError.new(result[:err_code], "#{operation}: #{result[:err_msg]}") end end end diff --git a/lib/fila/codec.rb b/lib/fila/codec.rb index 0b3bafd..6d5126e 100644 --- a/lib/fila/codec.rb +++ b/lib/fila/codec.rb @@ -6,7 +6,7 @@ module Fila # All strings are UTF-8; all lengths and integers are big-endian. # # @api private - module Codec + module Codec # rubocop:disable Metrics/ModuleLength module_function # ----------------------------------------------------------------------- @@ -26,7 +26,7 @@ def encode_enqueue(queue, messages) queue_b = queue.encode('UTF-8').b buf = [queue_b.bytesize].pack('n') + queue_b buf += [messages.size].pack('n') - messages.each { |m| buf += encode_message(m) } + messages.each { |msg| buf += encode_message(msg) } buf end @@ -70,49 +70,17 @@ def encode_consume(queue, initial_credits: 256) # Decode a server-push consume message frame payload. # - # The server pushes individual messages (flags bit 2 set). - # Each push frame has the same layout as a single message entry - # preceded by a 1-element count (i.e., msg_count:u16 = 1, then the message). - # # @param payload [String] raw binary frame payload # @return [ConsumeMessage, nil] def decode_consume_push(payload) pos = 0 - - # msg_id - id_len, pos = read_u16(payload, pos) - msg_id = payload.byteslice(pos, id_len).force_encoding('UTF-8') - pos += id_len - - # fairness_key - fk_len, pos = read_u16(payload, pos) - fairness_key = payload.byteslice(pos, fk_len).force_encoding('UTF-8') - pos += fk_len - - # attempt_count - attempt_count, pos = read_u32(payload, pos) - - # queue_id - qid_len, pos = read_u16(payload, pos) - queue_id = payload.byteslice(pos, qid_len).force_encoding('UTF-8') - pos += qid_len - - # headers - header_count, pos = read_u8(payload, pos) - headers = {} - header_count.times do - k_len, pos = read_u16(payload, pos) - k = payload.byteslice(pos, k_len).force_encoding('UTF-8') - pos += k_len - v_len, pos = read_u16(payload, pos) - v = payload.byteslice(pos, v_len).force_encoding('UTF-8') - pos += v_len - headers[k] = v - end - - # payload - pay_len, pos = read_u32(payload, pos) - body = payload.byteslice(pos, pay_len) + msg_id, pos = read_str16(payload, pos) + fairness_key, pos = read_str16(payload, pos) + attempt_count, pos = read_u32(payload, pos) + queue_id, pos = read_str16(payload, pos) + headers, pos = read_headers(payload, pos) + pay_len, pos = read_u32(payload, pos) + body = payload.byteslice(pos, pay_len) ConsumeMessage.new( id: msg_id, @@ -192,9 +160,9 @@ def encode_nack(items) def encode_message(msg) headers = msg[:headers] || {} buf = [headers.size].pack('C') - headers.each do |k, v| - buf += encode_str16(k.to_s) - buf += encode_str16(v.to_s) + headers.each do |key, val| + buf += encode_str16(key.to_s) + buf += encode_str16(val.to_s) end payload_b = (msg[:payload] || '').b buf += [payload_b.bytesize].pack('N') + payload_b @@ -202,8 +170,24 @@ def encode_message(msg) end def encode_str16(str) - b = str.encode('UTF-8').b - [b.bytesize].pack('n') + b + bytes = str.encode('UTF-8').b + [bytes.bytesize].pack('n') + bytes + end + + def read_str16(buf, pos) + len, pos = read_u16(buf, pos) + [buf.byteslice(pos, len).force_encoding('UTF-8'), pos + len] + end + + def read_headers(buf, pos) + count, pos = read_u8(buf, pos) + headers = {} + count.times do + key, pos = read_str16(buf, pos) + val, pos = read_str16(buf, pos) + headers[key] = val + end + [headers, pos] end def read_u8(buf, pos) diff --git a/lib/fila/transport.rb b/lib/fila/transport.rb index 072754a..df4a42d 100644 --- a/lib/fila/transport.rb +++ b/lib/fila/transport.rb @@ -2,7 +2,6 @@ require 'socket' require 'openssl' -require 'thread' module Fila # Low-level FIBP (Fila Binary Protocol) transport. @@ -17,8 +16,8 @@ module Fila # # @api private class Transport # rubocop:disable Metrics/ClassLength - HANDSHAKE = "FIBP\x01\x00".b.freeze - HEADER_SIZE = 6 # flags:u8 + op:u8 + corr_id:u32 + HANDSHAKE = "FIBP\x01\x00".b.freeze + HEADER_SIZE = 6 # flags:u8 + op:u8 + corr_id:u32 # Op codes OP_ENQUEUE = 0x01 @@ -32,15 +31,15 @@ class Transport # rubocop:disable Metrics/ClassLength OP_GOAWAY = 0xFF # Frame flags - FLAG_SERVER_PUSH = 0x04 # bit 2: server-push message frame + FLAG_SERVER_PUSH = 0x04 # bit 2: server-push message frame # Error codes returned by the server inside ERROR frames ERR_QUEUE_NOT_FOUND = 1 ERR_MESSAGE_NOT_FOUND = 2 ERR_UNAUTHENTICATED = 3 - # Sentinel pushed to a pending queue when the connection closes. - ConnectionClosed = Class.new(StandardError) + # Sentinel raised when the connection is closed. + class ConnectionClosed < StandardError; end # @param host [String] # @param port [Integer] @@ -49,7 +48,9 @@ class Transport # rubocop:disable Metrics/ClassLength # @param client_cert [String, nil] PEM client cert (mTLS) # @param client_key [String, nil] PEM client key (mTLS) # @param api_key [String, nil] - def initialize(host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) + def initialize( # rubocop:disable Metrics/ParameterLists + host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil + ) @host = host @port = port @tls = tls || ca_cert @@ -72,11 +73,11 @@ def initialize(host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_ # Send a request frame and block until the response arrives. # - # @param op [Integer] op code + # @param opcode [Integer] op code # @param payload [String] binary payload (encoding: BINARY) # @return [String] response payload # @raise [Fila::QueueNotFoundError, Fila::MessageNotFoundError, Fila::RPCError, ConnectionClosed] - def request(op, payload) + def request(opcode, payload) corr_id = next_corr_id result_q = Queue.new @@ -86,7 +87,7 @@ def request(op, payload) @pending[corr_id] = result_q end - write_frame(op, corr_id, payload) + write_frame(opcode, corr_id, payload) outcome = result_q.pop case outcome @@ -123,7 +124,11 @@ def stop_consume(corr_id) # Close the connection. def close @mutex.synchronize { @closed = true } - @socket.close rescue nil + begin + @socket.close + rescue IOError, OpenSSL::SSL::SSLError + nil + end @reader_thread&.join(2) drain_pending end @@ -195,31 +200,20 @@ def reader_loop def dispatch_frame(frame) flags = frame.getbyte(0) - op = frame.getbyte(1) + opcode = frame.getbyte(1) corr_id = frame.byteslice(2, 4).unpack1('N') payload = frame.byteslice(HEADER_SIZE, frame.bytesize - HEADER_SIZE) || ''.b dest = @mutex.synchronize { @pending[corr_id] } return unless dest - if op == OP_ERROR - err = parse_error_frame(payload) - if flags & FLAG_SERVER_PUSH != 0 - dest.push(err) - else - @mutex.synchronize { @pending.delete(corr_id) } - dest.push(err) - end - elsif op == OP_GOAWAY - drain_pending - elsif flags & FLAG_SERVER_PUSH != 0 - # Server-push consume frame: leave @pending intact, push payload - dest.push(payload) - else - # Normal response: remove pending, push payload - @mutex.synchronize { @pending.delete(corr_id) } - dest.push(payload) - end + return drain_pending if opcode == OP_GOAWAY + + push_only = flags.anybits?(FLAG_SERVER_PUSH) + result = opcode == OP_ERROR ? parse_error_frame(payload) : payload + + @mutex.synchronize { @pending.delete(corr_id) } unless push_only + dest.push(result) end def parse_error_frame(payload) @@ -234,10 +228,10 @@ def parse_error_frame(payload) end end - def write_frame(op, corr_id, payload) - flags = 0 - header = [flags, op, corr_id].pack('CCN') - body = header + payload.b + def write_frame(opcode, corr_id, payload) + flags = 0 + header = [flags, opcode, corr_id].pack('CCN') + body = header + payload.b write_raw([body.bytesize].pack('N') + body) end @@ -249,10 +243,10 @@ def write_raw(bytes) raise ConnectionClosed, "write failed: #{e.message}" end - def read_raw(n) + def read_raw(num_bytes) buf = ''.b - while buf.bytesize < n - chunk = @socket.read(n - buf.bytesize) + while buf.bytesize < num_bytes + chunk = @socket.read(num_bytes - buf.bytesize) return nil if chunk.nil? || chunk.empty? buf << chunk @@ -270,7 +264,11 @@ def next_corr_id def drain_pending err = ConnectionClosed.new('connection closed') @mutex.synchronize do - @pending.each_value { |q| q.push(err) rescue nil } + @pending.each_value do |queue| + queue.push(err) + rescue StandardError + nil + end @pending.clear end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 01f14cd..e40b3ac 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -13,7 +13,7 @@ end FILA_SERVER_AVAILABLE = File.exist?(FILA_SERVER_BIN) -module TestServerHelper +module TestServerHelper # rubocop:disable Metrics/ModuleLength def self.find_free_port server = TCPServer.new('127.0.0.1', 0) port = server.addr[1] @@ -28,7 +28,7 @@ def self.find_free_port # @param bootstrap_apikey [String, nil] optional bootstrap API key # @return [Hash] server info with :addr, :host, :port, :pid, :data_dir # and optional :tls_config, :bootstrap_apikey - def self.start(tls_config: nil, bootstrap_apikey: nil) # rubocop:disable Metrics/MethodLength + def self.start(tls_config: nil, bootstrap_apikey: nil) port = find_free_port addr = "127.0.0.1:#{port}" From 9d68743618141850cd65ffdcd9d5eb8bb0ccce59 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:31:31 -0300 Subject: [PATCH 3/5] fix: use tcp-only readiness check in test_helper the fibp handshake-based readiness check fails against the existing gRPC server binary (dev-latest release) since it speaks HTTP/2, not FIBP. switch to a plain tcp connect which reliably detects when the port is accepting connections, regardless of the server protocol. the subsequent fibp operations will produce explicit errors if the server does not speak FIBP. --- test/test_helper.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/test_helper.rb b/test/test_helper.rb index e40b3ac..593b101 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -80,11 +80,14 @@ def self.wait_for_ready(server_info, stderr_path, toml) ready = false while Time.now < deadline begin - transport = admin_transport(server_info) - transport.close + # Use a plain TCP connect to check the port is accepting connections. + # A full FIBP handshake would fail against a non-FIBP server (e.g. old + # gRPC binary), masking real startup failures. + sock = TCPSocket.new(server_info[:host], server_info[:port]) + sock.close ready = true break - rescue StandardError + rescue Errno::ECONNREFUSED, Errno::ECONNRESET sleep 0.05 end end From 02b312ffaa8edf68eec2dd4d1fd595810831d03d Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:36:30 -0300 Subject: [PATCH 4/5] fix: address cubic p0/p1 findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit p0 (transport.rb): deadlock — start_reader before send_auth so the reader thread is running to deliver the auth response. previously send_auth was called before start_reader, blocking forever on a response that no thread would deliver. p1 (codec.rb): decode_consume_push was missing the leading msg_count:u16be field. added read of _msg_count at position 0 to align all subsequent field offsets correctly. p1 (client.rb + batcher.rb): enqueue_single and result_to_outcome raised queuenotfounderror for all per-message failures. preserved error_code in enqueueresult and added raise_enqueue_error helper that distinguishes queue_not_found (error_code 1) from other failures (raises rpcerror with the actual code). p3 (readme.md): corrected api key auth docs to say the key is sent once as an auth frame at connection setup, not on every request. --- README.md | 2 +- lib/fila/batcher.rb | 6 +++++- lib/fila/client.rb | 11 ++++++++++- lib/fila/codec.rb | 14 ++++++++++---- lib/fila/enqueue_result.rb | 9 +++++++-- lib/fila/transport.rb | 2 +- 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 28eb469..361ec8d 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ client = Fila::Client.new("localhost:5555", ```ruby require "fila" -# API key sent on every request. +# API key sent once as an AUTH frame when the connection is established. client = Fila::Client.new("localhost:5555", api_key: "fila_your_api_key_here" ) diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index f82d757..d75fd89 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -153,7 +153,11 @@ def result_to_outcome(result) return Fila::Error.new('no result from server') if result.nil? return result.message_id if result.success? - QueueNotFoundError.new("enqueue: #{result.error}") + if result.error_code == Transport::ERR_QUEUE_NOT_FOUND + QueueNotFoundError.new("enqueue: #{result.error}") + else + RPCError.new(result.error_code.to_i, "enqueue: #{result.error}") + end end def broadcast_error(items, err) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 62ead56..6a731c1 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -213,7 +213,7 @@ def enqueue_single(msg) result = results.first raise RPCError.new(0, 'no result from server') if result.nil? - raise QueueNotFoundError, "enqueue: #{result.error}" unless result.success? + raise_enqueue_error(result) unless result.success? result.message_id rescue Transport::ConnectionClosed => e @@ -259,6 +259,15 @@ def consume_stream(queue, &block) @transport.stop_consume(corr_id) if corr_id end + def raise_enqueue_error(result) + case result.error_code + when Transport::ERR_QUEUE_NOT_FOUND + raise QueueNotFoundError, "enqueue: #{result.error}" + else + raise RPCError.new(result.error_code.to_i, "enqueue: #{result.error}") + end + end + def raise_ack_nack_error(result, operation) case result[:err_code] when Transport::ERR_MESSAGE_NOT_FOUND diff --git a/lib/fila/codec.rb b/lib/fila/codec.rb index 6d5126e..fb24b38 100644 --- a/lib/fila/codec.rb +++ b/lib/fila/codec.rb @@ -44,11 +44,11 @@ def decode_enqueue_response(payload) pos += id_len results << EnqueueResult.new(message_id: msg_id) else - _err_code, pos = read_u16(payload, pos) - err_len, pos = read_u16(payload, pos) + err_code, pos = read_u16(payload, pos) + err_len, pos = read_u16(payload, pos) err_msg = payload.byteslice(pos, err_len).force_encoding('UTF-8') pos += err_len - results << EnqueueResult.new(error: err_msg) + results << EnqueueResult.new(error: err_msg, error_code: err_code) end end results @@ -70,10 +70,16 @@ def encode_consume(queue, initial_credits: 256) # Decode a server-push consume message frame payload. # + # Frame payload: msg_count:u16BE | messages... + # Each message: msg_id_len:u16BE+msg_id | fk_len:u16BE+fk | + # attempt_count:u32BE | queue_id_len:u16BE+queue_id | + # header_count:u8 | headers | payload_len:u32BE | payload + # # @param payload [String] raw binary frame payload - # @return [ConsumeMessage, nil] + # @return [ConsumeMessage, nil] the first message in the frame def decode_consume_push(payload) pos = 0 + _msg_count, pos = read_u16(payload, pos) msg_id, pos = read_str16(payload, pos) fairness_key, pos = read_str16(payload, pos) attempt_count, pos = read_u32(payload, pos) diff --git a/lib/fila/enqueue_result.rb b/lib/fila/enqueue_result.rb index ce8b3db..51b4cbe 100644 --- a/lib/fila/enqueue_result.rb +++ b/lib/fila/enqueue_result.rb @@ -22,11 +22,16 @@ class EnqueueResult # @return [String, nil] error description on failure attr_reader :error + # @return [Integer, nil] FIBP error code on failure (nil on success) + attr_reader :error_code + # @param message_id [String, nil] message ID if successful # @param error [String, nil] error string if failed - def initialize(message_id: nil, error: nil) + # @param error_code [Integer, nil] FIBP error code if failed + def initialize(message_id: nil, error: nil, error_code: nil) @message_id = message_id - @error = error + @error = error + @error_code = error_code end # @return [Boolean] true if the message was successfully enqueued diff --git a/lib/fila/transport.rb b/lib/fila/transport.rb index df4a42d..1f29944 100644 --- a/lib/fila/transport.rb +++ b/lib/fila/transport.rb @@ -67,8 +67,8 @@ def initialize( # rubocop:disable Metrics/ParameterLists @socket = connect_socket perform_handshake - send_auth if @api_key start_reader + send_auth if @api_key end # Send a request frame and block until the response arrives. From 660ed44a1951db8468d39b72579dd9848178c426 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:45:12 -0300 Subject: [PATCH 5/5] fix: broaden tcp readiness rescue and preserve enqueue error_code - use SystemCallError instead of specific errno subclasses to handle all network errors during server readiness polling (e.g. ETIMEDOUT) - enqueueresult now carries error_code for per-message error differentiation (already committed in previous fix commit) --- test/test_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_helper.rb b/test/test_helper.rb index 593b101..3775895 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -87,7 +87,7 @@ def self.wait_for_ready(server_info, stderr_path, toml) sock.close ready = true break - rescue Errno::ECONNREFUSED, Errno::ECONNRESET + rescue SystemCallError sleep 0.05 end end