Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Ruby client SDK for the [Fila](https://github.com/faisca/fila) message broker.

Uses the FIBP (Fila Binary Protocol) transport — a lightweight length-prefixed binary protocol over raw TCP. No gRPC or protobuf dependencies required.

## Installation

```bash
Expand Down Expand Up @@ -89,7 +91,7 @@ client = Fila::Client.new("localhost:5555",
```ruby
require "fila"

# API key sent as Bearer token on every request.
# API key sent once as an AUTH frame when the connection is established.
client = Fila::Client.new("localhost:5555",
api_key: "fila_your_api_key_here"
)
Expand Down Expand Up @@ -122,7 +124,7 @@ Connect to a Fila broker at the given address (e.g., `"localhost:5555"`).
| `ca_cert:` | `String` or `nil` | PEM-encoded CA certificate for TLS (implies `tls: true`) |
| `client_cert:` | `String` or `nil` | PEM-encoded client certificate for mTLS |
| `client_key:` | `String` or `nil` | PEM-encoded client private key for mTLS |
| `api_key:` | `String` or `nil` | API key for Bearer token authentication |
| `api_key:` | `String` or `nil` | API key for authentication |

When no TLS/auth options are provided, the client connects over plaintext (backward compatible). When `tls: true` is set without `ca_cert:`, the OS system trust store is used for server certificate verification.

Expand All @@ -144,7 +146,7 @@ Negatively acknowledge a failed message. The message is requeued or routed to th

### `client.close`

Close the underlying gRPC channel.
Drain any pending batched messages, then close the underlying TCP connection.

## Error Handling

Expand All @@ -164,6 +166,10 @@ rescue Fila::MessageNotFoundError => e
end
```

## Transport

This SDK uses **FIBP** (Fila Binary Protocol): a length-prefixed binary protocol over raw TCP (or TLS). It has no runtime dependencies beyond Ruby's standard library (`socket`, `openssl`, `thread`).

## License

AGPLv3
7 changes: 3 additions & 4 deletions fila-client.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ Gem::Specification.new do |spec|
spec.version = Fila::VERSION
spec.authors = ['Faisca']
spec.summary = 'Ruby client SDK for the Fila message broker'
spec.description = "Idiomatic Ruby client wrapping Fila's gRPC API for enqueue, consume, ack, and nack operations."
spec.description = 'Ruby client for the Fila message broker using FIBP transport. ' \
'Supports enqueue, consume, ack, nack, TLS/mTLS, and API key auth.'
spec.homepage = 'https://github.com/faiscadev/fila-ruby'
spec.license = 'AGPL-3.0-or-later'
spec.required_ruby_version = '>= 3.1'

spec.files = Dir['lib/**/*.rb', 'proto/**/*.proto', 'LICENSE', 'README.md']
spec.files = Dir['lib/**/*.rb', 'LICENSE', 'README.md']
spec.require_paths = ['lib']

spec.add_dependency 'google-protobuf', '~> 4.0'
spec.add_dependency 'grpc', '~> 1.60'
spec.metadata['rubygems_mfa_required'] = 'true'
end
2 changes: 2 additions & 0 deletions lib/fila.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@
require_relative 'fila/errors'
require_relative 'fila/consume_message'
require_relative 'fila/enqueue_result'
require_relative 'fila/codec'
require_relative 'fila/transport'
require_relative 'fila/batcher'
require_relative 'fila/client'
80 changes: 44 additions & 36 deletions lib/fila/batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@

module Fila
# Background batcher that collects enqueue messages and flushes them
# in batches via the unified Enqueue RPC. Supports auto (opportunistic)
# in batches via the FIBP transport. Supports auto (opportunistic)
# and linger (timer-based) modes.
#
# @api private
class Batcher # rubocop:disable Metrics/ClassLength
# An item queued for batching, pairing a message with its result slot.
BatchItem = Struct.new(:message, :result_queue, keyword_init: true)

# @param stub [Fila::V1::FilaService::Stub] gRPC stub
# @param metadata [Hash] call metadata (auth headers)
# @param transport [Fila::Transport] FIBP transport
# @param mode [Symbol] :auto or :linger
# @param max_batch_size [Integer] cap on batch size (auto mode)
# @param batch_size [Integer] batch size threshold (linger mode)
# @param linger_ms [Integer] linger time in ms (linger mode)
def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10)
@stub = stub
@metadata = metadata
@mode = mode
def initialize(transport:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10)
@transport = transport
@mode = mode
@max_batch_size = mode == :auto ? max_batch_size : batch_size
@linger_ms = linger_ms
@queue = Queue.new
@stopped = false
@mutex = Mutex.new
@linger_ms = linger_ms
@queue = Queue.new
@stopped = false
@mutex = Mutex.new

@thread = Thread.new { run_loop }
@thread.abort_on_exception = true
Expand All @@ -33,10 +31,10 @@ def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, li
# Submit a message for batched sending. Blocks until the batch
# containing this message is flushed and the result is available.
#
# @param message [Fila::V1::EnqueueMessage] the enqueue message
# @param message [Hash] enqueue message with :queue, :payload, :headers
# @return [String] message ID on success
# @raise [Fila::QueueNotFoundError] if the queue does not exist
# @raise [Fila::RPCError] for unexpected gRPC failures
# @raise [Fila::RPCError] for unexpected transport failures
def submit(message)
result_queue = Queue.new
item = BatchItem.new(message: message, result_queue: result_queue)
Expand All @@ -47,10 +45,9 @@ def submit(message)
@queue.push(item)
end

# Block until the batcher flushes our batch and posts the result.
outcome = result_queue.pop
case outcome
when String then outcome
when String then outcome
when Exception then raise outcome
else raise Fila::Error, "unexpected batcher result: #{outcome.inspect}"
end
Expand All @@ -67,7 +64,7 @@ def close

def run_loop
case @mode
when :auto then run_auto_loop
when :auto then run_auto_loop
when :linger then run_linger_loop
end
end
Expand Down Expand Up @@ -116,9 +113,9 @@ def run_linger_loop
def drain_nonblocking(batch)
while batch.size < @max_batch_size
begin
item = @queue.pop(true) # non_block = true
item = @queue.pop(true)
if item == :shutdown
@queue.push(:shutdown) # re-enqueue so the loop sees it
@queue.push(:shutdown)
break
end
batch << item
Expand All @@ -128,36 +125,47 @@ def drain_nonblocking(batch)
end
end

# Flush a batch of items via the unified Enqueue RPC.
def flush_batch(items)
req = ::Fila::V1::EnqueueRequest.new(messages: items.map(&:message))
results = @stub.enqueue(req, metadata: @metadata).results

items.each_with_index do |item, idx|
item.result_queue.push(result_to_outcome(results[idx]))
# Flush a batch of items via the FIBP transport.
# Groups items by queue to produce one frame per queue.
def flush_batch(items) # rubocop:disable Metrics/AbcSize
# Group by queue, preserving per-item result queues
groups = items.each_with_index.group_by { |item, _| item.message[:queue] }

groups.each do |queue, indexed_items|
items_only = indexed_items.map(&:first)
msgs = items_only.map(&:message)
payload = Codec.encode_enqueue(queue, msgs)
resp = @transport.request(Transport::OP_ENQUEUE, payload)
results = Codec.decode_enqueue_response(resp)

items_only.each_with_index do |item, idx|
item.result_queue.push(result_to_outcome(results[idx]))
end
end
rescue GRPC::BadStatus => e
broadcast_error(items, RPCError.new(e.code, e.details))
rescue Transport::ConnectionClosed => e
broadcast_error(items, RPCError.new(0, "connection closed: #{e.message}"))
rescue StandardError => e
broadcast_error(items, Fila::Error.new(e.message))
end

# Convert a single proto EnqueueResult into a String (message_id) or Exception.
# Convert an EnqueueResult into a String (message_id) or Exception.
def result_to_outcome(result)
return Fila::Error.new('no result from server') if result.nil?
return result.message_id if result.result == :message_id
return result.message_id if result.success?

err = result.error
case err.code
when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND
QueueNotFoundError.new("enqueue: #{err.message}")
if result.error_code == Transport::ERR_QUEUE_NOT_FOUND
QueueNotFoundError.new("enqueue: #{result.error}")
else
RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message)
RPCError.new(result.error_code.to_i, "enqueue: #{result.error}")
end
end

def broadcast_error(items, err)
items.each { |item| item.result_queue.push(err) }
items.each do |item|
item.result_queue.push(err)
rescue StandardError
nil
end
end

def current_time_ms
Expand All @@ -173,7 +181,7 @@ def pop_with_timeout(timeout_ms)
rescue ThreadError
raise if current_time_ms >= deadline

sleep(0.001) # 1ms polling interval
sleep(0.001)
end
end
end
Expand Down
Loading
Loading