diff --git a/README.md b/README.md index 9c59120..361ec8d 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Ruby client SDK for the [Fila](https://github.com/faisca/fila) message broker. +Uses the FIBP (Fila Binary Protocol) transport — a lightweight length-prefixed binary protocol over raw TCP. No gRPC or protobuf dependencies required. + ## Installation ```bash @@ -89,7 +91,7 @@ client = Fila::Client.new("localhost:5555", ```ruby require "fila" -# API key sent as Bearer token on every request. +# API key sent once as an AUTH frame when the connection is established. client = Fila::Client.new("localhost:5555", api_key: "fila_your_api_key_here" ) @@ -122,7 +124,7 @@ Connect to a Fila broker at the given address (e.g., `"localhost:5555"`). | `ca_cert:` | `String` or `nil` | PEM-encoded CA certificate for TLS (implies `tls: true`) | | `client_cert:` | `String` or `nil` | PEM-encoded client certificate for mTLS | | `client_key:` | `String` or `nil` | PEM-encoded client private key for mTLS | -| `api_key:` | `String` or `nil` | API key for Bearer token authentication | +| `api_key:` | `String` or `nil` | API key for authentication | When no TLS/auth options are provided, the client connects over plaintext (backward compatible). When `tls: true` is set without `ca_cert:`, the OS system trust store is used for server certificate verification. @@ -144,7 +146,7 @@ Negatively acknowledge a failed message. The message is requeued or routed to th ### `client.close` -Close the underlying gRPC channel. +Drain any pending batched messages, then close the underlying TCP connection. ## Error Handling @@ -164,6 +166,10 @@ rescue Fila::MessageNotFoundError => e end ``` +## Transport + +This SDK uses **FIBP** (Fila Binary Protocol): a length-prefixed binary protocol over raw TCP (or TLS). It has no runtime dependencies beyond Ruby's standard library (`socket`, `openssl`, `thread`). + ## License AGPLv3 diff --git a/fila-client.gemspec b/fila-client.gemspec index 98cbfad..ca41930 100644 --- a/fila-client.gemspec +++ b/fila-client.gemspec @@ -7,15 +7,14 @@ Gem::Specification.new do |spec| spec.version = Fila::VERSION spec.authors = ['Faisca'] spec.summary = 'Ruby client SDK for the Fila message broker' - spec.description = "Idiomatic Ruby client wrapping Fila's gRPC API for enqueue, consume, ack, and nack operations." + spec.description = 'Ruby client for the Fila message broker using FIBP transport. ' \ + 'Supports enqueue, consume, ack, nack, TLS/mTLS, and API key auth.' spec.homepage = 'https://github.com/faiscadev/fila-ruby' spec.license = 'AGPL-3.0-or-later' spec.required_ruby_version = '>= 3.1' - spec.files = Dir['lib/**/*.rb', 'proto/**/*.proto', 'LICENSE', 'README.md'] + spec.files = Dir['lib/**/*.rb', 'LICENSE', 'README.md'] spec.require_paths = ['lib'] - spec.add_dependency 'google-protobuf', '~> 4.0' - spec.add_dependency 'grpc', '~> 1.60' spec.metadata['rubygems_mfa_required'] = 'true' end diff --git a/lib/fila.rb b/lib/fila.rb index 3a74a1b..2559ab7 100644 --- a/lib/fila.rb +++ b/lib/fila.rb @@ -4,5 +4,7 @@ require_relative 'fila/errors' require_relative 'fila/consume_message' require_relative 'fila/enqueue_result' +require_relative 'fila/codec' +require_relative 'fila/transport' require_relative 'fila/batcher' require_relative 'fila/client' diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index 593d323..d75fd89 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -2,7 +2,7 @@ module Fila # Background batcher that collects enqueue messages and flushes them - # in batches via the unified Enqueue RPC. Supports auto (opportunistic) + # in batches via the FIBP transport. Supports auto (opportunistic) # and linger (timer-based) modes. # # @api private @@ -10,21 +10,19 @@ class Batcher # rubocop:disable Metrics/ClassLength # An item queued for batching, pairing a message with its result slot. BatchItem = Struct.new(:message, :result_queue, keyword_init: true) - # @param stub [Fila::V1::FilaService::Stub] gRPC stub - # @param metadata [Hash] call metadata (auth headers) + # @param transport [Fila::Transport] FIBP transport # @param mode [Symbol] :auto or :linger # @param max_batch_size [Integer] cap on batch size (auto mode) # @param batch_size [Integer] batch size threshold (linger mode) # @param linger_ms [Integer] linger time in ms (linger mode) - def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) - @stub = stub - @metadata = metadata - @mode = mode + def initialize(transport:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) + @transport = transport + @mode = mode @max_batch_size = mode == :auto ? max_batch_size : batch_size - @linger_ms = linger_ms - @queue = Queue.new - @stopped = false - @mutex = Mutex.new + @linger_ms = linger_ms + @queue = Queue.new + @stopped = false + @mutex = Mutex.new @thread = Thread.new { run_loop } @thread.abort_on_exception = true @@ -33,10 +31,10 @@ def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, li # Submit a message for batched sending. Blocks until the batch # containing this message is flushed and the result is available. # - # @param message [Fila::V1::EnqueueMessage] the enqueue message + # @param message [Hash] enqueue message with :queue, :payload, :headers # @return [String] message ID on success # @raise [Fila::QueueNotFoundError] if the queue does not exist - # @raise [Fila::RPCError] for unexpected gRPC failures + # @raise [Fila::RPCError] for unexpected transport failures def submit(message) result_queue = Queue.new item = BatchItem.new(message: message, result_queue: result_queue) @@ -47,10 +45,9 @@ def submit(message) @queue.push(item) end - # Block until the batcher flushes our batch and posts the result. outcome = result_queue.pop case outcome - when String then outcome + when String then outcome when Exception then raise outcome else raise Fila::Error, "unexpected batcher result: #{outcome.inspect}" end @@ -67,7 +64,7 @@ def close def run_loop case @mode - when :auto then run_auto_loop + when :auto then run_auto_loop when :linger then run_linger_loop end end @@ -116,9 +113,9 @@ def run_linger_loop def drain_nonblocking(batch) while batch.size < @max_batch_size begin - item = @queue.pop(true) # non_block = true + item = @queue.pop(true) if item == :shutdown - @queue.push(:shutdown) # re-enqueue so the loop sees it + @queue.push(:shutdown) break end batch << item @@ -128,36 +125,47 @@ def drain_nonblocking(batch) end end - # Flush a batch of items via the unified Enqueue RPC. - def flush_batch(items) - req = ::Fila::V1::EnqueueRequest.new(messages: items.map(&:message)) - results = @stub.enqueue(req, metadata: @metadata).results - - items.each_with_index do |item, idx| - item.result_queue.push(result_to_outcome(results[idx])) + # Flush a batch of items via the FIBP transport. + # Groups items by queue to produce one frame per queue. + def flush_batch(items) # rubocop:disable Metrics/AbcSize + # Group by queue, preserving per-item result queues + groups = items.each_with_index.group_by { |item, _| item.message[:queue] } + + groups.each do |queue, indexed_items| + items_only = indexed_items.map(&:first) + msgs = items_only.map(&:message) + payload = Codec.encode_enqueue(queue, msgs) + resp = @transport.request(Transport::OP_ENQUEUE, payload) + results = Codec.decode_enqueue_response(resp) + + items_only.each_with_index do |item, idx| + item.result_queue.push(result_to_outcome(results[idx])) + end end - rescue GRPC::BadStatus => e - broadcast_error(items, RPCError.new(e.code, e.details)) + rescue Transport::ConnectionClosed => e + broadcast_error(items, RPCError.new(0, "connection closed: #{e.message}")) rescue StandardError => e broadcast_error(items, Fila::Error.new(e.message)) end - # Convert a single proto EnqueueResult into a String (message_id) or Exception. + # Convert an EnqueueResult into a String (message_id) or Exception. def result_to_outcome(result) return Fila::Error.new('no result from server') if result.nil? - return result.message_id if result.result == :message_id + return result.message_id if result.success? - err = result.error - case err.code - when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND - QueueNotFoundError.new("enqueue: #{err.message}") + if result.error_code == Transport::ERR_QUEUE_NOT_FOUND + QueueNotFoundError.new("enqueue: #{result.error}") else - RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message) + RPCError.new(result.error_code.to_i, "enqueue: #{result.error}") end end def broadcast_error(items, err) - items.each { |item| item.result_queue.push(err) } + items.each do |item| + item.result_queue.push(err) + rescue StandardError + nil + end end def current_time_ms @@ -173,7 +181,7 @@ def pop_with_timeout(timeout_ms) rescue ThreadError raise if current_time_ms >= deadline - sleep(0.001) # 1ms polling interval + sleep(0.001) end end end diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 53d6cff..6a731c1 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -1,20 +1,14 @@ # frozen_string_literal: true -require 'grpc' - -# Add proto directory to load path so generated requires resolve correctly. -$LOAD_PATH.unshift(File.expand_path('proto', __dir__)) unless $LOAD_PATH.include?(File.expand_path('proto', __dir__)) - -require_relative 'proto/fila/v1/service_services_pb' require_relative 'errors' require_relative 'consume_message' require_relative 'enqueue_result' require_relative 'batcher' +require_relative 'transport' +require_relative 'codec' module Fila - # Client for the Fila message broker. - # - # Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. + # Client for the Fila message broker over the FIBP (Fila Binary Protocol). # # @example Plain-text, default auto-batching # client = Fila::Client.new("localhost:5555") @@ -57,9 +51,15 @@ def initialize( # rubocop:disable Metrics/ParameterLists batch_size: 100, linger_ms: 10 ) validate_batch_mode(batch_mode) - @api_key = api_key - @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) - @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) + validate_tls_options(tls || ca_cert, client_cert, client_key) + + host, port = parse_addr(addr) + @transport = Transport.new( + host: host, port: port, + tls: tls, ca_cert: ca_cert, + client_cert: client_cert, client_key: client_key, + api_key: api_key + ) @batcher = start_batcher(batch_mode, max_batch_size, batch_size, linger_ms) end @@ -67,6 +67,8 @@ def initialize( # rubocop:disable Metrics/ParameterLists def close @batcher&.close @batcher = nil + @transport&.close + @transport = nil end # Enqueue a single message to a queue. @@ -80,13 +82,9 @@ def close # @param headers [Hash, nil] optional headers # @return [String] broker-assigned message ID # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def enqueue(queue:, payload:, headers: nil) - msg = ::Fila::V1::EnqueueMessage.new( - queue: queue, - headers: headers || {}, - payload: payload - ) + msg = { queue: queue, payload: payload, headers: headers || {} } if @batcher @batcher.submit(msg) @@ -108,36 +106,29 @@ def enqueue(queue:, payload:, headers: nil) # keys :queue (String), :payload (String), and optionally # :headers (Hash) # @return [Array] - # @raise [RPCError] for transport-level gRPC failures + # @raise [RPCError] for transport-level failures def enqueue_many(messages) - proto_messages = messages.map do |m| - ::Fila::V1::EnqueueMessage.new( - queue: m[:queue], - headers: m[:headers] || {}, - payload: m[:payload] - ) - end - - req = ::Fila::V1::EnqueueRequest.new(messages: proto_messages) - resp = @stub.enqueue(req, metadata: call_metadata) - - resp.results.map do |r| - if r.result == :message_id - EnqueueResult.new(message_id: r.message_id) - else - EnqueueResult.new(error: r.error.message) - end - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + return [] if messages.empty? + + # Group messages by queue for the wire format (all in one frame, + # queue name is per-message in FIBP). + # The protocol supports a single queue per frame, so we use the + # queue of the first message and encode the rest individually by + # sending as a batch under each unique queue. + enqueue_many_raw(messages) + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end # Open a streaming consumer. Yields messages as they arrive. # Returns an Enumerator if no block given. + # + # @param queue [String] queue to consume + # @yield [ConsumeMessage] def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block - consume_with_redirect(queue: queue, redirected: false, &block) + consume_stream(queue, &block) end # Acknowledge a successfully processed message. @@ -145,25 +136,19 @@ def consume(queue:, &block) # @param queue [String] queue the message belongs to # @param msg_id [String] ID of the message to acknowledge # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def ack(queue:, msg_id:) - msg = ::Fila::V1::AckMessage.new(queue: queue, message_id: msg_id) - req = ::Fila::V1::AckRequest.new(messages: [msg]) - resp = @stub.ack(req, metadata: call_metadata) - - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? - return if result.result == :success - - err = result.error - case err.code - when :ACK_ERROR_CODE_MESSAGE_NOT_FOUND - raise MessageNotFoundError, "ack: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "ack: #{err.message}") - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + payload = Codec.encode_ack([{ queue: queue, msg_id: msg_id }]) + resp = @transport.request(Transport::OP_ACK, payload) + results = Codec.decode_ack_response(resp) + + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? + return if result[:ok] + + raise_ack_nack_error(result, 'ack') + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end # Negatively acknowledge a message that failed processing. @@ -172,45 +157,49 @@ def ack(queue:, msg_id:) # @param msg_id [String] ID of the message to nack # @param error [String] description of the failure # @raise [MessageNotFoundError] if the message does not exist - # @raise [RPCError] for unexpected gRPC failures + # @raise [RPCError] for unexpected transport failures def nack(queue:, msg_id:, error:) - msg = ::Fila::V1::NackMessage.new(queue: queue, message_id: msg_id, error: error) - req = ::Fila::V1::NackRequest.new(messages: [msg]) - resp = @stub.nack(req, metadata: call_metadata) - - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? - return if result.result == :success - - err = result.error - case err.code - when :NACK_ERROR_CODE_MESSAGE_NOT_FOUND - raise MessageNotFoundError, "nack: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "nack: #{err.message}") - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) - end + payload = Codec.encode_nack([{ queue: queue, msg_id: msg_id, error: error }]) + resp = @transport.request(Transport::OP_NACK, payload) + results = Codec.decode_nack_response(resp) - LEADER_ADDR_KEY = 'x-fila-leader-addr' + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? + return if result[:ok] - private_constant :LEADER_ADDR_KEY + raise_ack_nack_error(result, 'nack') + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") + end private + def parse_addr(addr) + # Support "host:port" and IPv6 "[::1]:5555" + pattern = addr.start_with?('[') ? /\A\[(.+)\]:(\d+)\z/ : /\A(.+):(\d+)\z/ + m = addr.match(pattern) + raise ArgumentError, "invalid address #{addr.inspect}, expected host:port" unless m + + [m[1], m[2].to_i] + end + def validate_batch_mode(mode) return if BATCH_MODES.include?(mode) raise ArgumentError, "invalid batch_mode: #{mode.inspect}, must be one of #{BATCH_MODES.inspect}" end + def validate_tls_options(tls_enabled, client_cert, client_key) + return if tls_enabled || (!client_cert && !client_key) + + raise ArgumentError, 'tls: true or ca_cert is required when client_cert or client_key is provided' + end + def start_batcher(mode, max_batch_size, batch_size, linger_ms) return nil if mode == :disabled Batcher.new( - stub: @stub, - metadata: call_metadata, + transport: @transport, mode: mode, max_batch_size: max_batch_size, batch_size: batch_size, @@ -218,98 +207,74 @@ def start_batcher(mode, max_batch_size, batch_size, linger_ms) ) end - # Send a single message via the unified Enqueue RPC. + # Send a single message as a batch of one. def enqueue_single(msg) - req = ::Fila::V1::EnqueueRequest.new(messages: [msg]) - resp = @stub.enqueue(req, metadata: call_metadata) + results = enqueue_many_raw([msg]) + result = results.first + raise RPCError.new(0, 'no result from server') if result.nil? - result = resp.results.first - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? + raise_enqueue_error(result) unless result.success? - if result.result == :message_id - result.message_id - else - err = result.error - case err.code - when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND - raise QueueNotFoundError, "enqueue: #{err.message}" - else - raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "enqueue: #{err.message}") - end - end - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + result.message_id + rescue Transport::ConnectionClosed => e + raise RPCError.new(0, "connection closed: #{e.message}") end - def consume_with_redirect(queue:, redirected:, &block) - stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata) - stream.each do |resp| - yield_messages_from_response(resp, &block) + # Raw multi-message enqueue — groups by queue, sends one frame per queue. + def enqueue_many_raw(messages) + # Group by queue to produce per-queue frames + groups = messages.each_with_index.group_by { |m, _| m[:queue] } + # Collect results in original order + all_results = Array.new(messages.size) + + groups.each do |queue, indexed_msgs| + msgs_only = indexed_msgs.map(&:first) + indices = indexed_msgs.map(&:last) + payload = Codec.encode_enqueue(queue, msgs_only) + resp = @transport.request(Transport::OP_ENQUEUE, payload) + results = Codec.decode_enqueue_response(resp) + + indices.each_with_index { |orig_idx, i| all_results[orig_idx] = results[i] } end - rescue GRPC::Cancelled then nil - rescue GRPC::NotFound => e - raise QueueNotFoundError, "consume: #{e.details}" - rescue GRPC::Unavailable => e - raise RPCError.new(e.code, e.details) if (leader_addr = extract_leader_addr(e)).nil? || redirected - - @stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials) - consume_with_redirect(queue: queue, redirected: true, &block) - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) - end - - # Unpack messages from a ConsumeResponse. - def yield_messages_from_response(resp, &block) - resp.messages.each do |msg| - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - end - - def extract_leader_addr(err) - err.metadata[LEADER_ADDR_KEY] - rescue StandardError - nil - end - - def build_credentials(tls:, ca_cert:, client_cert:, client_key:) - tls_enabled = tls || ca_cert - validate_tls_options(tls_enabled, client_cert, client_key) - return :this_channel_is_insecure unless tls_enabled - build_channel_credentials(ca_cert, client_cert, client_key) + all_results end - def validate_tls_options(tls_enabled, client_cert, client_key) - return if tls_enabled || (!client_cert && !client_key) - - raise ArgumentError, 'tls: true or ca_cert is required when client_cert or client_key is provided' - end - - def build_channel_credentials(ca_cert, client_cert, client_key) - if ca_cert then GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - elsif client_cert && client_key then GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) - else GRPC::Core::ChannelCredentials.new + def consume_stream(queue, &block) + push_q = Queue.new + payload = Codec.encode_consume(queue) + corr_id = @transport.start_consume(payload, push_q) + + loop do + frame = push_q.pop + case frame + when Transport::ConnectionClosed then break + when Exception then raise frame + when String + msg = Codec.decode_consume_push(frame) + block.call(msg) if msg + end end + ensure + @transport.stop_consume(corr_id) if corr_id end - def call_metadata - return {} unless @api_key - - { 'authorization' => "Bearer #{@api_key}" } + def raise_enqueue_error(result) + case result.error_code + when Transport::ERR_QUEUE_NOT_FOUND + raise QueueNotFoundError, "enqueue: #{result.error}" + else + raise RPCError.new(result.error_code.to_i, "enqueue: #{result.error}") + end end - def build_consume_message(msg) - metadata = msg.metadata - ConsumeMessage.new( - id: msg.id, - headers: msg.headers.to_h, - payload: msg.payload, - fairness_key: metadata&.fairness_key.to_s, - attempt_count: metadata&.attempt_count.to_i, - queue: metadata&.queue_id.to_s - ) + def raise_ack_nack_error(result, operation) + case result[:err_code] + when Transport::ERR_MESSAGE_NOT_FOUND + raise MessageNotFoundError, "#{operation}: #{result[:err_msg]}" + else + raise RPCError.new(result[:err_code], "#{operation}: #{result[:err_msg]}") + end end end end diff --git a/lib/fila/codec.rb b/lib/fila/codec.rb new file mode 100644 index 0000000..fb24b38 --- /dev/null +++ b/lib/fila/codec.rb @@ -0,0 +1,211 @@ +# frozen_string_literal: true + +module Fila + # Encoding and decoding helpers for the FIBP binary wire format. + # + # All strings are UTF-8; all lengths and integers are big-endian. + # + # @api private + module Codec # rubocop:disable Metrics/ModuleLength + module_function + + # ----------------------------------------------------------------------- + # Enqueue request + # + # queue_len:u16BE | queue:utf8 + # msg_count:u16BE + # messages... (each: header_count:u8 | + # headers: (key_len:u16BE+key, val_len:u16BE+val)* | + # payload_len:u32BE | payload) + # ----------------------------------------------------------------------- + + # @param queue [String] + # @param messages [Array] each has :payload and optional :headers + # @return [String] binary payload + def encode_enqueue(queue, messages) + queue_b = queue.encode('UTF-8').b + buf = [queue_b.bytesize].pack('n') + queue_b + buf += [messages.size].pack('n') + messages.each { |msg| buf += encode_message(msg) } + buf + end + + # @param payload [String] raw binary response payload + # @return [Array] + def decode_enqueue_response(payload) + pos = 0 + count, pos = read_u16(payload, pos) + results = [] + count.times do + ok, pos = read_u8(payload, pos) + if ok == 1 + id_len, pos = read_u16(payload, pos) + msg_id = payload.byteslice(pos, id_len).force_encoding('UTF-8') + pos += id_len + results << EnqueueResult.new(message_id: msg_id) + else + err_code, pos = read_u16(payload, pos) + err_len, pos = read_u16(payload, pos) + err_msg = payload.byteslice(pos, err_len).force_encoding('UTF-8') + pos += err_len + results << EnqueueResult.new(error: err_msg, error_code: err_code) + end + end + results + end + + # ----------------------------------------------------------------------- + # Consume request + # + # queue_len:u16BE | queue:utf8 | initial_credits:u32BE + # ----------------------------------------------------------------------- + + # @param queue [String] + # @param initial_credits [Integer] + # @return [String] binary payload + def encode_consume(queue, initial_credits: 256) + queue_b = queue.encode('UTF-8').b + [queue_b.bytesize].pack('n') + queue_b + [initial_credits].pack('N') + end + + # Decode a server-push consume message frame payload. + # + # Frame payload: msg_count:u16BE | messages... + # Each message: msg_id_len:u16BE+msg_id | fk_len:u16BE+fk | + # attempt_count:u32BE | queue_id_len:u16BE+queue_id | + # header_count:u8 | headers | payload_len:u32BE | payload + # + # @param payload [String] raw binary frame payload + # @return [ConsumeMessage, nil] the first message in the frame + def decode_consume_push(payload) + pos = 0 + _msg_count, pos = read_u16(payload, pos) + msg_id, pos = read_str16(payload, pos) + fairness_key, pos = read_str16(payload, pos) + attempt_count, pos = read_u32(payload, pos) + queue_id, pos = read_str16(payload, pos) + headers, pos = read_headers(payload, pos) + pay_len, pos = read_u32(payload, pos) + body = payload.byteslice(pos, pay_len) + + ConsumeMessage.new( + id: msg_id, + headers: headers, + payload: body, + fairness_key: fairness_key, + attempt_count: attempt_count, + queue: queue_id + ) + end + + # ----------------------------------------------------------------------- + # Ack request + # + # item_count:u16BE + # items...: queue_len:u16BE+queue + msg_id_len:u16BE+msg_id + # ----------------------------------------------------------------------- + + # @param items [Array] each has :queue and :msg_id + # @return [String] binary payload + def encode_ack(items) + buf = [items.size].pack('n') + items.each do |item| + buf += encode_str16(item[:queue]) + buf += encode_str16(item[:msg_id]) + end + buf + end + + # Decode an ack response. + # Response: item_count:u16 | items...: ok:u8 | if !ok: err_code:u16+err_len:u16+err_msg + # + # @param payload [String] + # @return [Array] each has :ok and optional :err_code, :err_msg + def decode_ack_response(payload) + pos = 0 + count, pos = read_u16(payload, pos) + results = [] + count.times do + ok, pos = read_u8(payload, pos) + if ok == 1 + results << { ok: true } + else + err_code, pos = read_u16(payload, pos) + err_len, pos = read_u16(payload, pos) + err_msg = payload.byteslice(pos, err_len).force_encoding('UTF-8') + pos += err_len + results << { ok: false, err_code: err_code, err_msg: err_msg } + end + end + results + end + + # ----------------------------------------------------------------------- + # Nack request + # + # Same as Ack but each item also has: err_len:u16BE+err_msg + # ----------------------------------------------------------------------- + + # @param items [Array] each has :queue, :msg_id, :error + # @return [String] binary payload + def encode_nack(items) + buf = [items.size].pack('n') + items.each do |item| + buf += encode_str16(item[:queue]) + buf += encode_str16(item[:msg_id]) + buf += encode_str16(item[:error].to_s) + end + buf + end + + # Decode a nack response (same shape as ack response). + alias decode_nack_response decode_ack_response + + private + + def encode_message(msg) + headers = msg[:headers] || {} + buf = [headers.size].pack('C') + headers.each do |key, val| + buf += encode_str16(key.to_s) + buf += encode_str16(val.to_s) + end + payload_b = (msg[:payload] || '').b + buf += [payload_b.bytesize].pack('N') + payload_b + buf + end + + def encode_str16(str) + bytes = str.encode('UTF-8').b + [bytes.bytesize].pack('n') + bytes + end + + def read_str16(buf, pos) + len, pos = read_u16(buf, pos) + [buf.byteslice(pos, len).force_encoding('UTF-8'), pos + len] + end + + def read_headers(buf, pos) + count, pos = read_u8(buf, pos) + headers = {} + count.times do + key, pos = read_str16(buf, pos) + val, pos = read_str16(buf, pos) + headers[key] = val + end + [headers, pos] + end + + def read_u8(buf, pos) + [buf.getbyte(pos), pos + 1] + end + + def read_u16(buf, pos) + [buf.byteslice(pos, 2).unpack1('n'), pos + 2] + end + + def read_u32(buf, pos) + [buf.byteslice(pos, 4).unpack1('N'), pos + 4] + end + end +end diff --git a/lib/fila/enqueue_result.rb b/lib/fila/enqueue_result.rb index ce8b3db..51b4cbe 100644 --- a/lib/fila/enqueue_result.rb +++ b/lib/fila/enqueue_result.rb @@ -22,11 +22,16 @@ class EnqueueResult # @return [String, nil] error description on failure attr_reader :error + # @return [Integer, nil] FIBP error code on failure (nil on success) + attr_reader :error_code + # @param message_id [String, nil] message ID if successful # @param error [String, nil] error string if failed - def initialize(message_id: nil, error: nil) + # @param error_code [Integer, nil] FIBP error code if failed + def initialize(message_id: nil, error: nil, error_code: nil) @message_id = message_id - @error = error + @error = error + @error_code = error_code end # @return [Boolean] true if the message was successfully enqueued diff --git a/lib/fila/errors.rb b/lib/fila/errors.rb index 2f3dcb8..13dcef4 100644 --- a/lib/fila/errors.rb +++ b/lib/fila/errors.rb @@ -10,12 +10,12 @@ class QueueNotFoundError < Error; end # Raised when the specified message does not exist. class MessageNotFoundError < Error; end - # Raised for unexpected gRPC failures, preserving status code and message. + # Raised for unexpected transport failures, preserving an error code and message. class RPCError < Error - # @return [Integer] gRPC status code + # @return [Integer] error code (FIBP error code or 0 for connection errors) attr_reader :code - # @param code [Integer] gRPC status code + # @param code [Integer] error code # @param message [String] error message def initialize(code, message) @code = code diff --git a/lib/fila/proto/fila/v1/admin_pb.rb b/lib/fila/proto/fila/v1/admin_pb.rb deleted file mode 100644 index 5138c83..0000000 --- a/lib/fila/proto/fila/v1/admin_pb.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/admin.proto - -require 'google/protobuf' - - -descriptor_data = "\n\x13\x66ila/v1/admin.proto\x12\x07\x66ila.v1\"H\n\x12\x43reateQueueRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12$\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x14.fila.v1.QueueConfig\"b\n\x0bQueueConfig\x12\x19\n\x11on_enqueue_script\x18\x01 \x01(\t\x12\x19\n\x11on_failure_script\x18\x02 \x01(\t\x12\x1d\n\x15visibility_timeout_ms\x18\x03 \x01(\x04\"\'\n\x13\x43reateQueueResponse\x12\x10\n\x08queue_id\x18\x01 \x01(\t\"#\n\x12\x44\x65leteQueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"\x15\n\x13\x44\x65leteQueueResponse\".\n\x10SetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x13\n\x11SetConfigResponse\"\x1f\n\x10GetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\"\n\x11GetConfigResponse\x12\r\n\x05value\x18\x01 \x01(\t\")\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"#\n\x11ListConfigRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\t\"P\n\x12ListConfigResponse\x12%\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x14.fila.v1.ConfigEntry\x12\x13\n\x0btotal_count\x18\x02 \x01(\r\" \n\x0fGetStatsRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"b\n\x13PerFairnessKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x15\n\rpending_count\x18\x02 \x01(\x04\x12\x17\n\x0f\x63urrent_deficit\x18\x03 \x01(\x03\x12\x0e\n\x06weight\x18\x04 \x01(\r\"Z\n\x13PerThrottleKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0e\n\x06tokens\x18\x02 \x01(\x01\x12\x17\n\x0frate_per_second\x18\x03 \x01(\x01\x12\r\n\x05\x62urst\x18\x04 \x01(\x01\"\x9f\x02\n\x10GetStatsResponse\x12\r\n\x05\x64\x65pth\x18\x01 \x01(\x04\x12\x11\n\tin_flight\x18\x02 \x01(\x04\x12\x1c\n\x14\x61\x63tive_fairness_keys\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x0f\n\x07quantum\x18\x05 \x01(\r\x12\x33\n\rper_key_stats\x18\x06 \x03(\x0b\x32\x1c.fila.v1.PerFairnessKeyStats\x12\x38\n\x12per_throttle_stats\x18\x07 \x03(\x0b\x32\x1c.fila.v1.PerThrottleKeyStats\x12\x16\n\x0eleader_node_id\x18\x08 \x01(\x04\x12\x19\n\x11replication_count\x18\t \x01(\r\"2\n\x0eRedriveRequest\x12\x11\n\tdlq_queue\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x04\"#\n\x0fRedriveResponse\x12\x10\n\x08redriven\x18\x01 \x01(\x04\"\x13\n\x11ListQueuesRequest\"m\n\tQueueInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x02 \x01(\x04\x12\x11\n\tin_flight\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x16\n\x0eleader_node_id\x18\x05 \x01(\x04\"T\n\x12ListQueuesResponse\x12\"\n\x06queues\x18\x01 \x03(\x0b\x32\x12.fila.v1.QueueInfo\x12\x1a\n\x12\x63luster_node_count\x18\x02 \x01(\r\"Q\n\x13\x43reateApiKeyRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rexpires_at_ms\x18\x02 \x01(\x04\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"J\n\x14\x43reateApiKeyResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"%\n\x13RevokeApiKeyRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"\x16\n\x14RevokeApiKeyResponse\"\x14\n\x12ListApiKeysRequest\"o\n\nApiKeyInfo\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x15\n\rcreated_at_ms\x18\x03 \x01(\x04\x12\x15\n\rexpires_at_ms\x18\x04 \x01(\x04\x12\x15\n\ris_superadmin\x18\x05 \x01(\x08\"8\n\x13ListApiKeysResponse\x12!\n\x04keys\x18\x01 \x03(\x0b\x32\x13.fila.v1.ApiKeyInfo\".\n\rAclPermission\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0f\n\x07pattern\x18\x02 \x01(\t\"L\n\rSetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\"\x10\n\x0eSetAclResponse\"\x1f\n\rGetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"d\n\x0eGetAclResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\x32\x8e\x07\n\tFilaAdmin\x12H\n\x0b\x43reateQueue\x12\x1b.fila.v1.CreateQueueRequest\x1a\x1c.fila.v1.CreateQueueResponse\x12H\n\x0b\x44\x65leteQueue\x12\x1b.fila.v1.DeleteQueueRequest\x1a\x1c.fila.v1.DeleteQueueResponse\x12\x42\n\tSetConfig\x12\x19.fila.v1.SetConfigRequest\x1a\x1a.fila.v1.SetConfigResponse\x12\x42\n\tGetConfig\x12\x19.fila.v1.GetConfigRequest\x1a\x1a.fila.v1.GetConfigResponse\x12\x45\n\nListConfig\x12\x1a.fila.v1.ListConfigRequest\x1a\x1b.fila.v1.ListConfigResponse\x12?\n\x08GetStats\x12\x18.fila.v1.GetStatsRequest\x1a\x19.fila.v1.GetStatsResponse\x12<\n\x07Redrive\x12\x17.fila.v1.RedriveRequest\x1a\x18.fila.v1.RedriveResponse\x12\x45\n\nListQueues\x12\x1a.fila.v1.ListQueuesRequest\x1a\x1b.fila.v1.ListQueuesResponse\x12K\n\x0c\x43reateApiKey\x12\x1c.fila.v1.CreateApiKeyRequest\x1a\x1d.fila.v1.CreateApiKeyResponse\x12K\n\x0cRevokeApiKey\x12\x1c.fila.v1.RevokeApiKeyRequest\x1a\x1d.fila.v1.RevokeApiKeyResponse\x12H\n\x0bListApiKeys\x12\x1b.fila.v1.ListApiKeysRequest\x1a\x1c.fila.v1.ListApiKeysResponse\x12\x39\n\x06SetAcl\x12\x16.fila.v1.SetAclRequest\x1a\x17.fila.v1.SetAclResponse\x12\x39\n\x06GetAcl\x12\x16.fila.v1.GetAclRequest\x1a\x17.fila.v1.GetAclResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - CreateQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueRequest").msgclass - QueueConfig = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueConfig").msgclass - CreateQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateQueueResponse").msgclass - DeleteQueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueRequest").msgclass - DeleteQueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.DeleteQueueResponse").msgclass - SetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigRequest").msgclass - SetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetConfigResponse").msgclass - GetConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigRequest").msgclass - GetConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetConfigResponse").msgclass - ConfigEntry = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConfigEntry").msgclass - ListConfigRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigRequest").msgclass - ListConfigResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListConfigResponse").msgclass - GetStatsRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsRequest").msgclass - PerFairnessKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerFairnessKeyStats").msgclass - PerThrottleKeyStats = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.PerThrottleKeyStats").msgclass - GetStatsResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetStatsResponse").msgclass - RedriveRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveRequest").msgclass - RedriveResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RedriveResponse").msgclass - ListQueuesRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesRequest").msgclass - QueueInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.QueueInfo").msgclass - ListQueuesResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListQueuesResponse").msgclass - CreateApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyRequest").msgclass - CreateApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.CreateApiKeyResponse").msgclass - RevokeApiKeyRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyRequest").msgclass - RevokeApiKeyResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.RevokeApiKeyResponse").msgclass - ListApiKeysRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysRequest").msgclass - ApiKeyInfo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ApiKeyInfo").msgclass - ListApiKeysResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ListApiKeysResponse").msgclass - AclPermission = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AclPermission").msgclass - SetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclRequest").msgclass - SetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.SetAclResponse").msgclass - GetAclRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclRequest").msgclass - GetAclResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.GetAclResponse").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/admin_services_pb.rb b/lib/fila/proto/fila/v1/admin_services_pb.rb deleted file mode 100644 index 71de19e..0000000 --- a/lib/fila/proto/fila/v1/admin_services_pb.rb +++ /dev/null @@ -1,39 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/admin.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/admin_pb' - -module Fila - module V1 - module FilaAdmin - # Admin RPCs for operators and the CLI. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaAdmin' - - rpc :CreateQueue, ::Fila::V1::CreateQueueRequest, ::Fila::V1::CreateQueueResponse - rpc :DeleteQueue, ::Fila::V1::DeleteQueueRequest, ::Fila::V1::DeleteQueueResponse - rpc :SetConfig, ::Fila::V1::SetConfigRequest, ::Fila::V1::SetConfigResponse - rpc :GetConfig, ::Fila::V1::GetConfigRequest, ::Fila::V1::GetConfigResponse - rpc :ListConfig, ::Fila::V1::ListConfigRequest, ::Fila::V1::ListConfigResponse - rpc :GetStats, ::Fila::V1::GetStatsRequest, ::Fila::V1::GetStatsResponse - rpc :Redrive, ::Fila::V1::RedriveRequest, ::Fila::V1::RedriveResponse - rpc :ListQueues, ::Fila::V1::ListQueuesRequest, ::Fila::V1::ListQueuesResponse - # API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc :CreateApiKey, ::Fila::V1::CreateApiKeyRequest, ::Fila::V1::CreateApiKeyResponse - rpc :RevokeApiKey, ::Fila::V1::RevokeApiKeyRequest, ::Fila::V1::RevokeApiKeyResponse - rpc :ListApiKeys, ::Fila::V1::ListApiKeysRequest, ::Fila::V1::ListApiKeysResponse - # Per-key ACL management. - rpc :SetAcl, ::Fila::V1::SetAclRequest, ::Fila::V1::SetAclResponse - rpc :GetAcl, ::Fila::V1::GetAclRequest, ::Fila::V1::GetAclResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/proto/fila/v1/messages_pb.rb b/lib/fila/proto/fila/v1/messages_pb.rb deleted file mode 100644 index ae3674f..0000000 --- a/lib/fila/proto/fila/v1/messages_pb.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/messages.proto - -require 'google/protobuf' - -require 'google/protobuf/timestamp_pb' - - -descriptor_data = "\n\x16\x66ila/v1/messages.proto\x12\x07\x66ila.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe2\x01\n\x07Message\x12\n\n\x02id\x18\x01 \x01(\t\x12.\n\x07headers\x18\x02 \x03(\x0b\x32\x1d.fila.v1.Message.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x12*\n\x08metadata\x18\x04 \x01(\x0b\x32\x18.fila.v1.MessageMetadata\x12.\n\ntimestamps\x18\x05 \x01(\x0b\x32\x1a.fila.v1.MessageTimestamps\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"w\n\x0fMessageMetadata\x12\x14\n\x0c\x66\x61irness_key\x18\x01 \x01(\t\x12\x0e\n\x06weight\x18\x02 \x01(\r\x12\x15\n\rthrottle_keys\x18\x03 \x03(\t\x12\x15\n\rattempt_count\x18\x04 \x01(\r\x12\x10\n\x08queue_id\x18\x05 \x01(\t\"s\n\x11MessageTimestamps\x12/\n\x0b\x65nqueued_at\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12-\n\tleased_at\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - Message = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.Message").msgclass - MessageMetadata = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageMetadata").msgclass - MessageTimestamps = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.MessageTimestamps").msgclass - end -end diff --git a/lib/fila/proto/fila/v1/service_pb.rb b/lib/fila/proto/fila/v1/service_pb.rb deleted file mode 100644 index 9751f67..0000000 --- a/lib/fila/proto/fila/v1/service_pb.rb +++ /dev/null @@ -1,42 +0,0 @@ -# frozen_string_literal: true -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: fila/v1/service.proto - -require 'google/protobuf' - -require 'fila/v1/messages_pb' - - -descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueMessage.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\";\n\x0e\x45nqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\"W\n\rEnqueueResult\x12\x14\n\nmessage_id\x18\x01 \x01(\tH\x00\x12&\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x15.fila.v1.EnqueueErrorH\x00\x42\x08\n\x06result\"H\n\x0c\x45nqueueError\x12\'\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x19.fila.v1.EnqueueErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\":\n\x0f\x45nqueueResponse\x12\'\n\x07results\x18\x01 \x03(\x0b\x32\x16.fila.v1.EnqueueResult\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"5\n\x0f\x43onsumeResponse\x12\"\n\x08messages\x18\x01 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"3\n\nAckRequest\x12%\n\x08messages\x18\x01 \x03(\x0b\x32\x13.fila.v1.AckMessage\"a\n\tAckResult\x12&\n\x07success\x18\x01 \x01(\x0b\x32\x13.fila.v1.AckSuccessH\x00\x12\"\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x11.fila.v1.AckErrorH\x00\x42\x08\n\x06result\"\x0c\n\nAckSuccess\"@\n\x08\x41\x63kError\x12#\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x15.fila.v1.AckErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"2\n\x0b\x41\x63kResponse\x12#\n\x07results\x18\x01 \x03(\x0b\x32\x12.fila.v1.AckResult\"?\n\x0bNackMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"5\n\x0bNackRequest\x12&\n\x08messages\x18\x01 \x03(\x0b\x32\x14.fila.v1.NackMessage\"d\n\nNackResult\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x14.fila.v1.NackSuccessH\x00\x12#\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x12.fila.v1.NackErrorH\x00\x42\x08\n\x06result\"\r\n\x0bNackSuccess\"B\n\tNackError\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.fila.v1.NackErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"4\n\x0cNackResponse\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.fila.v1.NackResult\"Z\n\x14StreamEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\x12\x17\n\x0fsequence_number\x18\x02 \x01(\x04\"Y\n\x15StreamEnqueueResponse\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x04\x12\'\n\x07results\x18\x02 \x03(\x0b\x32\x16.fila.v1.EnqueueResult*\xc4\x01\n\x10\x45nqueueErrorCode\x12\"\n\x1e\x45NQUEUE_ERROR_CODE_UNSPECIFIED\x10\x00\x12&\n\"ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND\x10\x01\x12\x1e\n\x1a\x45NQUEUE_ERROR_CODE_STORAGE\x10\x02\x12\x1a\n\x16\x45NQUEUE_ERROR_CODE_LUA\x10\x03\x12(\n$ENQUEUE_ERROR_CODE_PERMISSION_DENIED\x10\x04*\x96\x01\n\x0c\x41\x63kErrorCode\x12\x1e\n\x1a\x41\x43K_ERROR_CODE_UNSPECIFIED\x10\x00\x12$\n ACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1a\n\x16\x41\x43K_ERROR_CODE_STORAGE\x10\x02\x12$\n ACK_ERROR_CODE_PERMISSION_DENIED\x10\x03*\x9b\x01\n\rNackErrorCode\x12\x1f\n\x1bNACK_ERROR_CODE_UNSPECIFIED\x10\x00\x12%\n!NACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1b\n\x17NACK_ERROR_CODE_STORAGE\x10\x02\x12%\n!NACK_ERROR_CODE_PERMISSION_DENIED\x10\x03\x32\xc6\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12R\n\rStreamEnqueue\x12\x1d.fila.v1.StreamEnqueueRequest\x1a\x1e.fila.v1.StreamEnqueueResponse(\x01\x30\x01\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" - -pool = ::Google::Protobuf::DescriptorPool.generated_pool -pool.add_serialized_file(descriptor_data) - -module Fila - module V1 - EnqueueMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueMessage").msgclass - EnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueRequest").msgclass - EnqueueResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResult").msgclass - EnqueueError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueError").msgclass - EnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResponse").msgclass - ConsumeRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeRequest").msgclass - ConsumeResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeResponse").msgclass - AckMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckMessage").msgclass - AckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckRequest").msgclass - AckResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResult").msgclass - AckSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckSuccess").msgclass - AckError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckError").msgclass - AckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResponse").msgclass - NackMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackMessage").msgclass - NackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackRequest").msgclass - NackResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResult").msgclass - NackSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackSuccess").msgclass - NackError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackError").msgclass - NackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResponse").msgclass - StreamEnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueRequest").msgclass - StreamEnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueResponse").msgclass - EnqueueErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueErrorCode").enummodule - AckErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckErrorCode").enummodule - NackErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackErrorCode").enummodule - end -end diff --git a/lib/fila/proto/fila/v1/service_services_pb.rb b/lib/fila/proto/fila/v1/service_services_pb.rb deleted file mode 100644 index 93d38ab..0000000 --- a/lib/fila/proto/fila/v1/service_services_pb.rb +++ /dev/null @@ -1,29 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: fila/v1/service.proto for package 'fila.v1' - -require 'grpc' -require 'fila/v1/service_pb' - -module Fila - module V1 - module FilaService - # Hot-path RPCs for producers and consumers. - class Service - - include ::GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'fila.v1.FilaService' - - rpc :Enqueue, ::Fila::V1::EnqueueRequest, ::Fila::V1::EnqueueResponse - rpc :StreamEnqueue, stream(::Fila::V1::StreamEnqueueRequest), stream(::Fila::V1::StreamEnqueueResponse) - rpc :Consume, ::Fila::V1::ConsumeRequest, stream(::Fila::V1::ConsumeResponse) - rpc :Ack, ::Fila::V1::AckRequest, ::Fila::V1::AckResponse - rpc :Nack, ::Fila::V1::NackRequest, ::Fila::V1::NackResponse - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/lib/fila/transport.rb b/lib/fila/transport.rb new file mode 100644 index 0000000..1f29944 --- /dev/null +++ b/lib/fila/transport.rb @@ -0,0 +1,276 @@ +# frozen_string_literal: true + +require 'socket' +require 'openssl' + +module Fila + # Low-level FIBP (Fila Binary Protocol) transport. + # + # Manages a single TCP connection with: + # - Handshake on connect + # - Length-prefixed frame read/write + # - Correlation-ID-based request/response multiplexing + # - A reader thread that dispatches incoming frames + # - Optional TLS via OpenSSL::SSL + # - API key authentication via AUTH frame + # + # @api private + class Transport # rubocop:disable Metrics/ClassLength + HANDSHAKE = "FIBP\x01\x00".b.freeze + HEADER_SIZE = 6 # flags:u8 + op:u8 + corr_id:u32 + + # Op codes + OP_ENQUEUE = 0x01 + OP_CONSUME = 0x02 + OP_ACK = 0x03 + OP_NACK = 0x04 + OP_AUTH = 0x30 + OP_FLOW = 0x20 + OP_HEARTBEAT = 0x21 + OP_ERROR = 0xFE + OP_GOAWAY = 0xFF + + # Frame flags + FLAG_SERVER_PUSH = 0x04 # bit 2: server-push message frame + + # Error codes returned by the server inside ERROR frames + ERR_QUEUE_NOT_FOUND = 1 + ERR_MESSAGE_NOT_FOUND = 2 + ERR_UNAUTHENTICATED = 3 + + # Sentinel raised when the connection is closed. + class ConnectionClosed < StandardError; end + + # @param host [String] + # @param port [Integer] + # @param tls [Boolean] + # @param ca_cert [String, nil] PEM CA cert + # @param client_cert [String, nil] PEM client cert (mTLS) + # @param client_key [String, nil] PEM client key (mTLS) + # @param api_key [String, nil] + def initialize( # rubocop:disable Metrics/ParameterLists + host:, port:, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil + ) + @host = host + @port = port + @tls = tls || ca_cert + @ca_cert = ca_cert + @client_cert = client_cert + @client_key = client_key + @api_key = api_key + + @mutex = Mutex.new + @corr_seq = 0 + @pending = {} # corr_id => Queue + @push_queue = nil # set during consume_stream + @closed = false + + @socket = connect_socket + perform_handshake + start_reader + send_auth if @api_key + end + + # Send a request frame and block until the response arrives. + # + # @param opcode [Integer] op code + # @param payload [String] binary payload (encoding: BINARY) + # @return [String] response payload + # @raise [Fila::QueueNotFoundError, Fila::MessageNotFoundError, Fila::RPCError, ConnectionClosed] + def request(opcode, payload) + corr_id = next_corr_id + result_q = Queue.new + + @mutex.synchronize do + raise ConnectionClosed, 'connection is closed' if @closed + + @pending[corr_id] = result_q + end + + write_frame(opcode, corr_id, payload) + + outcome = result_q.pop + case outcome + when String then outcome + when Exception then raise outcome + else raise RPCError.new(0, "unexpected transport result: #{outcome.inspect}") + end + end + + # Register a push queue for consume-stream server-push frames. + # Returns corr_id used to issue the consume request. + # + # @param payload [String] consume request payload + # @param push_q [Queue] messages pushed here as they arrive + # @return [Integer] corr_id + def start_consume(payload, push_q) + corr_id = next_corr_id + + @mutex.synchronize do + raise ConnectionClosed, 'connection is closed' if @closed + + @pending[corr_id] = push_q + end + + write_frame(OP_CONSUME, corr_id, payload) + corr_id + end + + # Remove the consume push queue and stop dispatching to it. + def stop_consume(corr_id) + @mutex.synchronize { @pending.delete(corr_id) } + end + + # Close the connection. + def close + @mutex.synchronize { @closed = true } + begin + @socket.close + rescue IOError, OpenSSL::SSL::SSLError + nil + end + @reader_thread&.join(2) + drain_pending + end + + private + + def connect_socket + raw = TCPSocket.new(@host, @port) + raw.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + return raw unless @tls + + ctx = OpenSSL::SSL::SSLContext.new + ctx.set_params(verify_mode: OpenSSL::SSL::VERIFY_PEER) + + if @ca_cert + store = OpenSSL::X509::Store.new + store.add_cert(OpenSSL::X509::Certificate.new(@ca_cert)) + ctx.cert_store = store + end + # else: use default system trust store + + if @client_cert && @client_key + ctx.cert = OpenSSL::X509::Certificate.new(@client_cert) + ctx.key = OpenSSL::PKey::RSA.new(@client_key) + end + + ssl = OpenSSL::SSL::SSLSocket.new(raw, ctx) + ssl.hostname = @host + ssl.connect + ssl + end + + def perform_handshake + write_raw(HANDSHAKE) + echo = read_raw(6) + raise RPCError.new(0, "FIBP handshake failed: got #{echo.inspect}") unless echo == HANDSHAKE + end + + def send_auth + key_bytes = @api_key.encode('UTF-8').b + payload = [key_bytes.bytesize].pack('n') + key_bytes + request(OP_AUTH, payload) + end + + def start_reader + @reader_thread = Thread.new { reader_loop } + @reader_thread.abort_on_exception = false + end + + def reader_loop + loop do + # Read 4-byte length prefix + len_bytes = read_raw(4) + break unless len_bytes + + total_len = len_bytes.unpack1('N') + frame = read_raw(total_len) + break unless frame && frame.bytesize == total_len + + dispatch_frame(frame) + end + rescue IOError, Errno::ECONNRESET, Errno::EPIPE, OpenSSL::SSL::SSLError + # connection dropped + ensure + @mutex.synchronize { @closed = true } + drain_pending + end + + def dispatch_frame(frame) + flags = frame.getbyte(0) + opcode = frame.getbyte(1) + corr_id = frame.byteslice(2, 4).unpack1('N') + payload = frame.byteslice(HEADER_SIZE, frame.bytesize - HEADER_SIZE) || ''.b + + dest = @mutex.synchronize { @pending[corr_id] } + return unless dest + + return drain_pending if opcode == OP_GOAWAY + + push_only = flags.anybits?(FLAG_SERVER_PUSH) + result = opcode == OP_ERROR ? parse_error_frame(payload) : payload + + @mutex.synchronize { @pending.delete(corr_id) } unless push_only + dest.push(result) + end + + def parse_error_frame(payload) + err_code = payload.byteslice(0, 2).unpack1('n') + msg_len = payload.byteslice(2, 2).unpack1('n') + msg = payload.byteslice(4, msg_len).force_encoding('UTF-8') + case err_code + when ERR_QUEUE_NOT_FOUND then QueueNotFoundError.new(msg) + when ERR_MESSAGE_NOT_FOUND then MessageNotFoundError.new(msg) + when ERR_UNAUTHENTICATED then RPCError.new(ERR_UNAUTHENTICATED, msg) + else RPCError.new(err_code, msg) + end + end + + def write_frame(opcode, corr_id, payload) + flags = 0 + header = [flags, opcode, corr_id].pack('CCN') + body = header + payload.b + write_raw([body.bytesize].pack('N') + body) + end + + def write_raw(bytes) + @mutex.synchronize do + @socket.write(bytes) + end + rescue IOError, Errno::EPIPE, Errno::ECONNRESET, OpenSSL::SSL::SSLError => e + raise ConnectionClosed, "write failed: #{e.message}" + end + + def read_raw(num_bytes) + buf = ''.b + while buf.bytesize < num_bytes + chunk = @socket.read(num_bytes - buf.bytesize) + return nil if chunk.nil? || chunk.empty? + + buf << chunk + end + buf + end + + def next_corr_id + @mutex.synchronize do + @corr_seq = (@corr_seq + 1) & 0xFFFFFFFF + @corr_seq + end + end + + def drain_pending + err = ConnectionClosed.new('connection closed') + @mutex.synchronize do + @pending.each_value do |queue| + queue.push(err) + rescue StandardError + nil + end + @pending.clear + end + end + end +end diff --git a/lib/fila/version.rb b/lib/fila/version.rb index ecc3d06..efdd9b7 100644 --- a/lib/fila/version.rb +++ b/lib/fila/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Fila - VERSION = '0.4.0' + VERSION = '0.5.0' end diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto deleted file mode 100644 index 886e58d..0000000 --- a/proto/fila/v1/admin.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -// Admin RPCs for operators and the CLI. -service FilaAdmin { - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc SetConfig(SetConfigRequest) returns (SetConfigResponse); - rpc GetConfig(GetConfigRequest) returns (GetConfigResponse); - rpc ListConfig(ListConfigRequest) returns (ListConfigResponse); - rpc GetStats(GetStatsRequest) returns (GetStatsResponse); - rpc Redrive(RedriveRequest) returns (RedriveResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); -} - -message CreateQueueRequest { - string name = 1; - QueueConfig config = 2; -} - -message QueueConfig { - string on_enqueue_script = 1; - string on_failure_script = 2; - uint64 visibility_timeout_ms = 3; -} - -message CreateQueueResponse { - string queue_id = 1; -} - -message DeleteQueueRequest { - string queue = 1; -} - -message DeleteQueueResponse {} - -message SetConfigRequest { - string key = 1; - string value = 2; -} - -message SetConfigResponse {} - -message GetConfigRequest { - string key = 1; -} - -message GetConfigResponse { - string value = 1; -} - -message ConfigEntry { - string key = 1; - string value = 2; -} - -message ListConfigRequest { - string prefix = 1; -} - -message ListConfigResponse { - repeated ConfigEntry entries = 1; - uint32 total_count = 2; -} - -message GetStatsRequest { - string queue = 1; -} - -message PerFairnessKeyStats { - string key = 1; - uint64 pending_count = 2; - int64 current_deficit = 3; - uint32 weight = 4; -} - -message PerThrottleKeyStats { - string key = 1; - double tokens = 2; - double rate_per_second = 3; - double burst = 4; -} - -message GetStatsResponse { - uint64 depth = 1; - uint64 in_flight = 2; - uint64 active_fairness_keys = 3; - uint32 active_consumers = 4; - uint32 quantum = 5; - repeated PerFairnessKeyStats per_key_stats = 6; - repeated PerThrottleKeyStats per_throttle_stats = 7; - // Cluster fields (0 when not in cluster mode). - uint64 leader_node_id = 8; - uint32 replication_count = 9; -} - -message RedriveRequest { - string dlq_queue = 1; - uint64 count = 2; -} - -message RedriveResponse { - uint64 redriven = 1; -} - -message ListQueuesRequest {} - -message QueueInfo { - string name = 1; - uint64 depth = 2; - uint64 in_flight = 3; - uint32 active_consumers = 4; - uint64 leader_node_id = 5; -} - -message ListQueuesResponse { - repeated QueueInfo queues = 1; - uint32 cluster_node_count = 2; -} - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/messages.proto b/proto/fila/v1/messages.proto deleted file mode 100644 index a0709cf..0000000 --- a/proto/fila/v1/messages.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "google/protobuf/timestamp.proto"; - -// Core message envelope persisted in the broker. -message Message { - string id = 1; - map headers = 2; - bytes payload = 3; - MessageMetadata metadata = 4; - MessageTimestamps timestamps = 5; -} - -// Broker-assigned scheduling metadata. -message MessageMetadata { - string fairness_key = 1; - uint32 weight = 2; - repeated string throttle_keys = 3; - uint32 attempt_count = 4; - string queue_id = 5; -} - -// Lifecycle timestamps attached to every message. -message MessageTimestamps { - google.protobuf.Timestamp enqueued_at = 1; - google.protobuf.Timestamp leased_at = 2; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto deleted file mode 100644 index 7d1db79..0000000 --- a/proto/fila/v1/service.proto +++ /dev/null @@ -1,142 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "fila/v1/messages.proto"; - -// Hot-path RPCs for producers and consumers. -service FilaService { - rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); - rpc Ack(AckRequest) returns (AckResponse); - rpc Nack(NackRequest) returns (NackResponse); -} - -// Individual message to enqueue. -message EnqueueMessage { - string queue = 1; - map headers = 2; - bytes payload = 3; -} - -// Enqueue one or more messages. -message EnqueueRequest { - repeated EnqueueMessage messages = 1; -} - -// Per-message enqueue result. -message EnqueueResult { - oneof result { - string message_id = 1; - EnqueueError error = 2; - } -} - -// Typed enqueue error with structured error code. -message EnqueueError { - EnqueueErrorCode code = 1; - string message = 2; -} - -enum EnqueueErrorCode { - ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; - ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; - ENQUEUE_ERROR_CODE_STORAGE = 2; - ENQUEUE_ERROR_CODE_LUA = 3; - ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; -} - -// One result per input message. -message EnqueueResponse { - repeated EnqueueResult results = 1; -} - -message ConsumeRequest { - string queue = 1; -} - -message ConsumeResponse { - repeated Message messages = 1; -} - -// Individual ack item. -message AckMessage { - string queue = 1; - string message_id = 2; -} - -message AckRequest { - repeated AckMessage messages = 1; -} - -message AckResult { - oneof result { - AckSuccess success = 1; - AckError error = 2; - } -} - -message AckSuccess {} - -message AckError { - AckErrorCode code = 1; - string message = 2; -} - -enum AckErrorCode { - ACK_ERROR_CODE_UNSPECIFIED = 0; - ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - ACK_ERROR_CODE_STORAGE = 2; - ACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message AckResponse { - repeated AckResult results = 1; -} - -// Individual nack item. -message NackMessage { - string queue = 1; - string message_id = 2; - string error = 3; -} - -message NackRequest { - repeated NackMessage messages = 1; -} - -message NackResult { - oneof result { - NackSuccess success = 1; - NackError error = 2; - } -} - -message NackSuccess {} - -message NackError { - NackErrorCode code = 1; - string message = 2; -} - -enum NackErrorCode { - NACK_ERROR_CODE_UNSPECIFIED = 0; - NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - NACK_ERROR_CODE_STORAGE = 2; - NACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message NackResponse { - repeated NackResult results = 1; -} - -// Stream enqueue — per-write batch with sequence tracking. -message StreamEnqueueRequest { - repeated EnqueueMessage messages = 1; - uint64 sequence_number = 2; -} - -message StreamEnqueueResponse { - uint64 sequence_number = 1; - repeated EnqueueResult results = 2; -} diff --git a/test/test_batch.rb b/test/test_batch.rb index 62f1b41..295c6ce 100644 --- a/test/test_batch.rb +++ b/test/test_batch.rb @@ -222,15 +222,6 @@ def test_invalid_batch_mode_raises Fila::Client.new('localhost:5555', batch_mode: :invalid) end end - - def test_valid_batch_modes_accepted - # These should not raise (but won't connect since server isn't on this port). - # Just verify argument validation passes. - %i[auto linger disabled].each do |mode| - client = Fila::Client.new('localhost:19999', batch_mode: mode) - client.close - end - end end class TestCloseFlush < Minitest::Test diff --git a/test/test_helper.rb b/test/test_helper.rb index c88a349..3775895 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,19 +3,17 @@ require 'minitest/autorun' require 'tmpdir' require 'socket' -require 'grpc' +require 'openssl' $LOAD_PATH.unshift File.expand_path('../lib', __dir__) -$LOAD_PATH.unshift File.expand_path('../lib/fila/proto', __dir__) require 'fila' -require_relative '../lib/fila/proto/fila/v1/admin_services_pb' FILA_SERVER_BIN = ENV.fetch('FILA_SERVER_BIN') do File.join(__dir__, '..', '..', 'fila', 'target', 'release', 'fila-server') end FILA_SERVER_AVAILABLE = File.exist?(FILA_SERVER_BIN) -module TestServerHelper +module TestServerHelper # rubocop:disable Metrics/ModuleLength def self.find_free_port server = TCPServer.new('127.0.0.1', 0) port = server.addr[1] @@ -28,12 +26,13 @@ def self.find_free_port # @param tls_config [Hash, nil] optional TLS configuration with keys: # :ca_cert_path, :server_cert_path, :server_key_path # @param bootstrap_apikey [String, nil] optional bootstrap API key - # @return [Hash] server info with :addr, :pid, :data_dir, :admin_stub + # @return [Hash] server info with :addr, :host, :port, :pid, :data_dir + # and optional :tls_config, :bootstrap_apikey def self.start(tls_config: nil, bootstrap_apikey: nil) port = find_free_port addr = "127.0.0.1:#{port}" - data_dir = Dir.mktmpdir('fila-test-') + data_dir = Dir.mktmpdir('fila-test-') config_path = File.join(data_dir, 'fila.toml') toml = "[server]\nlisten_addr = \"#{addr}\"\n" @@ -62,56 +61,48 @@ def self.start(tls_config: nil, bootstrap_apikey: nil) ) end - # Build credentials for admin stub. - # client_ca_cert_path is always needed to verify server cert; ca_cert_path is only for mTLS. - credentials = :this_channel_is_insecure - if tls_config - ca_path = tls_config[:client_ca_cert_path] || tls_config[:ca_cert_path] - if ca_path - ca_cert = File.read(ca_path) - client_key = tls_config[:client_key_path] ? File.read(tls_config[:client_key_path]) : nil - client_cert = tls_config[:client_cert_path] ? File.read(tls_config[:client_cert_path]) : nil - credentials = GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - end - end + server_info = { + addr: addr, + host: '127.0.0.1', + port: port, + pid: pid, + data_dir: data_dir, + tls_config: tls_config, + bootstrap_apikey: bootstrap_apikey + } - admin_metadata = {} - admin_metadata['authorization'] = "Bearer #{bootstrap_apikey}" if bootstrap_apikey + wait_for_ready(server_info, stderr_path, toml) + server_info + end - # Wait for server ready. + def self.wait_for_ready(server_info, stderr_path, toml) deadline = Time.now + 10 ready = false while Time.now < deadline begin - try_list_queues(addr, credentials: credentials, metadata: admin_metadata) + # Use a plain TCP connect to check the port is accepting connections. + # A full FIBP handshake would fail against a non-FIBP server (e.g. old + # gRPC binary), masking real startup failures. + sock = TCPSocket.new(server_info[:host], server_info[:port]) + sock.close ready = true break - rescue StandardError + rescue SystemCallError sleep 0.05 end end - unless ready - Process.kill('TERM', pid) - Process.wait(pid) - stderr_output = begin - File.read(stderr_path) - rescue StandardError - '' - end - FileUtils.rm_rf(data_dir) - raise "fila-server failed to start within 10s on #{addr}\nConfig:\n#{toml}\nStderr:\n#{stderr_output}" - end + return if ready - admin_stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) - - { - addr: addr, - pid: pid, - data_dir: data_dir, - admin_stub: admin_stub, - admin_metadata: admin_metadata - } + Process.kill('TERM', server_info[:pid]) + Process.wait(server_info[:pid]) + stderr_output = begin + File.read(stderr_path) + rescue StandardError + '' + end + FileUtils.rm_rf(server_info[:data_dir]) + raise "fila-server failed to start within 10s on #{server_info[:addr]}\nConfig:\n#{toml}\nStderr:\n#{stderr_output}" end def self.stop(server) @@ -122,13 +113,41 @@ def self.stop(server) # Process already gone. end - def self.create_queue(server, name) - req = ::Fila::V1::CreateQueueRequest.new(name: name, config: {}) - server[:admin_stub].create_queue(req, metadata: server[:admin_metadata] || {}) + # Build a FIBP transport with appropriate TLS/auth for admin operations. + def self.admin_transport(server) + tc = server[:tls_config] + tls_opts = if tc + ca_path = tc[:client_ca_cert_path] || tc[:ca_cert_path] + { + tls: true, + ca_cert: ca_path ? File.read(ca_path) : nil, + client_cert: tc[:client_cert_path] ? File.read(tc[:client_cert_path]) : nil, + client_key: tc[:client_key_path] ? File.read(tc[:client_key_path]) : nil + } + else + { tls: false } + end + + Fila::Transport.new( + host: server[:host], + port: server[:port], + api_key: server[:bootstrap_apikey], + **tls_opts + ) end - def self.try_list_queues(addr, credentials: :this_channel_is_insecure, metadata: {}) - stub = ::Fila::V1::FilaAdmin::Stub.new(addr, credentials) - stub.list_queues(::Fila::V1::ListQueuesRequest.new, metadata: metadata) + # Send a CreateQueue admin frame via FIBP. + OP_CREATE_QUEUE = 0x10 + + def self.create_queue(server, name) + transport = admin_transport(server) + name_b = name.encode('UTF-8').b + payload = [name_b.bytesize].pack('n') + name_b + + [0].pack('n') # config_count: 0 key-value pairs + transport.request(OP_CREATE_QUEUE, payload) + rescue StandardError => e + raise "create_queue #{name.inspect} failed: #{e.message}" + ensure + transport&.close end end diff --git a/test/test_tls_auth.rb b/test/test_tls_auth.rb index 7aad862..a008387 100644 --- a/test/test_tls_auth.rb +++ b/test/test_tls_auth.rb @@ -79,6 +79,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end @@ -94,11 +95,13 @@ def test_enqueue_without_api_key_rejected client_no_key = Fila::Client.new(@server[:addr]) TestServerHelper.create_queue(@server, 'auth-reject-queue') - # Without API key, the server should reject the request with Unauthenticated. + # Without API key, the server should reject with Unauthenticated. err = assert_raises(Fila::RPCError) do client_no_key.enqueue(queue: 'auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # GRPC::Core::StatusCodes::UNAUTHENTICATED + assert_equal Fila::Transport::ERR_UNAUTHENTICATED, err.code + ensure + client_no_key&.close end def test_consume_with_api_key @@ -138,6 +141,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -189,6 +193,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -208,8 +213,6 @@ def setup @certs = CertHelper.generate_certs(@cert_dir) @bootstrap_key = 'tls-bootstrap-key-67890' - # Server-only TLS + API key: omit ca_cert_path so server does not require client certs. - # client_ca_cert_path is used by the test client to verify the server cert. @server = TestServerHelper.start( tls_config: { server_cert_path: @certs[:server_cert], @@ -227,6 +230,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server FileUtils.rm_rf(@cert_dir) if @cert_dir end @@ -249,7 +253,9 @@ def test_no_api_key_over_tls_rejected err = assert_raises(Fila::RPCError) do client_no_key.enqueue(queue: 'tls-auth-reject-queue', payload: 'should fail') end - assert_equal 16, err.code # UNAUTHENTICATED + assert_equal Fila::Transport::ERR_UNAUTHENTICATED, err.code + ensure + client_no_key&.close end end @@ -260,6 +266,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end