Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher - * runs on a dedicated daemon thread and flushes RPCs on an executor pool. Uses the unified Enqueue - * RPC with repeated messages for all batch sizes. + * runs on a dedicated daemon thread and flushes frames on an executor pool. + * + *
FIBP enqueue frames carry one queue name per frame, so messages targeting different queues are
+ * split into separate frames.
*/
final class Batcher {
private final LinkedBlockingQueue All integers are big-endian. String lengths are encoded as u16BE unless noted otherwise.
+ *
+ * Wire formats (from fila-core/src/fibp/wire.rs):
+ *
+ * The server sends error messages as plain text. We map them by keyword matching, matching the
+ * server's error message conventions.
+ */
+ static FilaException decodeError(byte[] payload) {
+ String msg = new String(payload, StandardCharsets.UTF_8);
+ String lower = msg.toLowerCase();
+ if (lower.contains("not found") && lower.contains("queue")) {
+ return new QueueNotFoundException(msg);
+ }
+ if (lower.contains("not found") && lower.contains("message")) {
+ return new MessageNotFoundException(msg);
+ }
+ if (lower.contains("not found")) {
+ return new QueueNotFoundException(msg);
+ }
+ if (lower.contains("permission denied") || lower.contains("forbidden")) {
+ return new RpcException(RpcException.Code.PERMISSION_DENIED, msg);
+ }
+ if (lower.contains("authentication required")
+ || lower.contains("unauthenticated")
+ || lower.contains("invalid api key")) {
+ return new RpcException(RpcException.Code.UNAUTHENTICATED, msg);
+ }
+ return new RpcException(RpcException.Code.INTERNAL, msg);
+ }
+
+ // ── Helpers ───────────────────────────────────────────────────────────────
+
+ private static void writeStr16(DataOutputStream dos, String s) throws IOException {
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+ if (bytes.length > 65535) {
+ throw new FilaException(
+ "string too long: " + bytes.length + " bytes exceeds u16 maximum of 65535");
+ }
+ dos.writeShort(bytes.length);
+ dos.write(bytes);
+ }
+
+ static int readU16(byte[] buf, int pos) {
+ return ((buf[pos] & 0xFF) << 8) | (buf[pos + 1] & 0xFF);
+ }
+
+ static int readU32(byte[] buf, int pos) {
+ return ((buf[pos] & 0xFF) << 24)
+ | ((buf[pos + 1] & 0xFF) << 16)
+ | ((buf[pos + 2] & 0xFF) << 8)
+ | (buf[pos + 3] & 0xFF);
+ }
+}
diff --git a/src/main/java/dev/faisca/fila/FibpConnection.java b/src/main/java/dev/faisca/fila/FibpConnection.java
new file mode 100644
index 0000000..9fe5167
--- /dev/null
+++ b/src/main/java/dev/faisca/fila/FibpConnection.java
@@ -0,0 +1,387 @@
+package dev.faisca.fila;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.net.ssl.SSLSocket;
+
+/**
+ * Low-level FIBP (Fila Binary Protocol) connection over a single TCP socket.
+ *
+ * Handles framing, handshake, correlation-ID multiplexing, heartbeats, and AUTH. A dedicated
+ * reader thread dispatches incoming frames to pending {@link CompletableFuture}s or registered push
+ * handlers.
+ *
+ * Thread-safety: all public methods are safe to call from multiple threads.
+ */
+final class FibpConnection implements AutoCloseable {
+
+ // ── Op codes ──────────────────────────────────────────────────────────────
+ static final byte OP_ENQUEUE = 0x01;
+ static final byte OP_CONSUME = 0x02;
+ static final byte OP_ACK = 0x03;
+ static final byte OP_NACK = 0x04;
+
+ static final byte OP_CREATE_QUEUE = 0x10;
+
+ static final byte OP_FLOW = 0x20;
+ static final byte OP_HEARTBEAT = 0x21;
+ static final byte OP_AUTH = 0x30;
+ static final byte OP_ERROR = (byte) 0xFE;
+ static final byte OP_GOAWAY = (byte) 0xFF;
+
+ // Flag bit: stream push (bit 2) — set by server on consume push frames
+ static final byte FLAG_STREAM = 0x04;
+
+ // ── Frame layout ──────────────────────────────────────────────────────────
+ // [4-byte length][flags:u8 | op:u8 | corr_id:u32 | payload]
+ // The 4-byte length covers flags + op + corr_id + payload.
+ private static final int FRAME_HEADER_BYTES = 6; // flags(1) + op(1) + corr_id(4)
+
+ // ── Handshake ─────────────────────────────────────────────────────────────
+ private static final byte[] HANDSHAKE_MAGIC = {'F', 'I', 'B', 'P', 0x01, 0x00};
+
+ // ── Heartbeat ─────────────────────────────────────────────────────────────
+ private static final long HEARTBEAT_INTERVAL_MS = 30_000;
+
+ private final Socket socket;
+ private final DataInputStream in;
+ private final DataOutputStream out;
+
+ // Pending request futures: corr_id → future carrying raw payload bytes
+ private final ConcurrentHashMap Callers must register a push handler for this corr_id before calling this method to avoid
+ * missing early push frames.
+ */
+ int sendConsumeRequest(byte[] payload, String queue, Consumer Wraps the hot-path gRPC operations: enqueue, consume, ack, nack.
+ * Uses the FIBP (Fila Binary Protocol) transport over raw TCP or TLS.
*
* By default, {@code enqueue()} routes through an opportunistic batcher that coalesces messages
* at high load without adding latency at low load. Use {@link Builder#withBatchMode(BatchMode)} to
@@ -44,30 +40,12 @@
* }
*/
public final class FilaClient implements AutoCloseable {
- private static final Metadata.Key Each message is independently validated and processed. A failed message does not affect the
* others in the batch. Returns a list of results with one entry per input message, in the same
* order.
*
- * This bypasses the batcher and always uses the {@code Enqueue} RPC directly.
+ * This bypasses the batcher and always sends a FIBP ENQUEUE frame directly. All messages must
+ * target the same queue (the first message's queue name is used).
*
* @param messages the messages to enqueue
* @return a list of results, one per input message
* @throws RpcException for transport-level failures affecting the entire batch
*/
public List Messages are delivered to the handler on a background thread. The handler transparently
- * receives messages from batched server responses. Nacked messages are redelivered on the same
- * stream. Call {@link ConsumerHandle#cancel()} to stop consuming.
+ * Messages are delivered to the handler on a background thread. Nacked messages are
+ * redelivered on the same stream. Call {@link ConsumerHandle#cancel()} to stop consuming.
*
* @param queue queue to consume from
* @param handler callback invoked for each message
* @return a handle to cancel the consumer
* @throws QueueNotFoundException if the queue does not exist
- * @throws RpcException for unexpected gRPC failures
+ * @throws RpcException for unexpected transport failures
*/
public ConsumerHandle consume(String queue, Consumer If a batcher is running, pending messages are flushed before the gRPC channel is closed.
+ * If a batcher is running, pending messages are flushed before the FIBP connection is closed.
*/
@Override
public void close() {
if (batcher != null) {
batcher.shutdown();
}
- channel.shutdown();
- try {
- if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
- channel.shutdownNow();
- }
- } catch (InterruptedException e) {
- channel.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
-
- /** Direct single-message enqueue RPC (no batcher). */
- private String enqueueDirect(String queue, Map When set, the key is sent as a {@code Bearer} token in the {@code authorization} metadata
- * header on every outgoing RPC.
+ * When set, the key is sent in an AUTH frame during the FIBP handshake.
*
* @param apiKey the API key string
* @return this builder
@@ -588,63 +377,122 @@ public FilaClient build() {
"client certificate requires TLS — call withTls() or withTlsCaCert() first");
}
- ManagedChannel channel;
+ String host = parseHost(address);
+ int port = parsePort(address);
- if (tlsEnabled) {
- // Parse host/port before the TLS try block so that NumberFormatException
- // (a subclass of IllegalArgumentException) from address parsing is not
- // misreported as "invalid certificate".
- String host = parseHost(address);
- int port = parsePort(address);
+ FibpConnection conn;
+ try {
+ if (tlsEnabled) {
+ SSLSocket sslSocket = buildSslSocket(host, port);
+ conn = FibpConnection.connectTls(sslSocket, apiKey);
+ } else {
+ conn = FibpConnection.connect(host, port, apiKey);
+ }
+ } catch (FilaException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new FilaException("failed to connect to " + address, e);
+ } catch (Exception e) {
+ throw new FilaException("failed to configure connection", e);
+ }
- try {
- TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder();
+ Batcher batcherInstance = null;
+ if (batchMode.getKind() != BatchMode.Kind.DISABLED) {
+ batcherInstance = new Batcher(conn, batchMode);
+ }
- if (caCertPem != null) {
- tlsBuilder.trustManager(new ByteArrayInputStream(caCertPem));
- }
+ return new FilaClient(conn, batcherInstance);
+ }
- if (clientCertPem != null && clientKeyPem != null) {
- tlsBuilder.keyManager(
- new ByteArrayInputStream(clientCertPem), new ByteArrayInputStream(clientKeyPem));
- }
+ private SSLSocket buildSslSocket(String host, int port) throws Exception {
+ SSLContext sslContext;
- ChannelCredentials creds = tlsBuilder.build();
- var channelBuilder = Grpc.newChannelBuilderForAddress(host, port, creds);
+ if (caCertPem != null) {
+ // Custom CA certificate
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate caCert;
+ try {
+ caCert = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(caCertPem));
+ } catch (Exception e) {
+ throw new FilaException("failed to configure TLS: invalid certificate", e);
+ }
- if (apiKey != null) {
- channelBuilder.intercept(new ApiKeyInterceptor(apiKey));
- }
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("fila-ca", caCert);
- channel = channelBuilder.build();
- } catch (IllegalArgumentException e) {
- throw new FilaException("failed to configure TLS: invalid certificate", e);
- } catch (IOException e) {
- throw new FilaException("failed to configure TLS", e);
+ TrustManagerFactory tmf =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(trustStore);
+
+ sslContext = SSLContext.getInstance("TLS");
+
+ if (clientCertPem != null && clientKeyPem != null) {
+ // mTLS: load client cert + key via PKCS12 round-trip
+ javax.net.ssl.KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem);
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ } else {
+ sslContext.init(null, tmf.getTrustManagers(), null);
}
} else {
- var channelBuilder = ManagedChannelBuilder.forTarget(address).usePlaintext();
-
- if (apiKey != null) {
- channelBuilder.intercept(new ApiKeyInterceptor(apiKey));
+ // System trust store
+ sslContext = SSLContext.getInstance("TLS");
+ if (clientCertPem != null && clientKeyPem != null) {
+ javax.net.ssl.KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem);
+ sslContext.init(kmf.getKeyManagers(), null, null);
+ } else {
+ sslContext.init(null, null, null);
}
-
- channel = channelBuilder.build();
}
- Batcher batcherInstance = null;
- if (batchMode.getKind() != BatchMode.Kind.DISABLED) {
- FilaServiceGrpc.FilaServiceBlockingStub batcherStub =
- FilaServiceGrpc.newBlockingStub(channel);
- if (apiKey != null) {
- // The stub needs the interceptor applied at channel level (already done above).
- // No additional interceptor needed on the stub.
- }
- batcherInstance = new Batcher(batcherStub, batchMode);
+ SSLSocket sock = (SSLSocket) sslContext.getSocketFactory().createSocket(host, port);
+ sock.setUseClientMode(true);
+ return sock;
+ }
+
+ /**
+ * Build a {@link javax.net.ssl.KeyManagerFactory} from PEM-encoded certificate and PKCS#8 key
+ * bytes by constructing an in-memory PKCS#12 keystore.
+ */
+ private static javax.net.ssl.KeyManagerFactory buildKeyManagerFactory(
+ byte[] certPem, byte[] keyPem) throws Exception {
+ // Parse the certificate
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ java.security.cert.Certificate cert =
+ cf.generateCertificate(new ByteArrayInputStream(certPem));
+
+ // Parse the private key — strip PEM headers and decode Base64
+ String keyStr = new String(keyPem, java.nio.charset.StandardCharsets.UTF_8);
+ String keyBase64 =
+ keyStr
+ .replaceAll("-----BEGIN.*?-----", "")
+ .replaceAll("-----END.*?-----", "")
+ .replaceAll("\\s", "");
+ byte[] keyBytes = java.util.Base64.getDecoder().decode(keyBase64);
+
+ java.security.PrivateKey privateKey;
+ // Try EC first, then RSA
+ java.security.KeyFactory kf;
+ try {
+ kf = java.security.KeyFactory.getInstance("EC");
+ privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes));
+ } catch (Exception e) {
+ kf = java.security.KeyFactory.getInstance("RSA");
+ privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes));
}
- return new FilaClient(
- channel, caCertPem, clientCertPem, clientKeyPem, apiKey, batcherInstance);
+ // Build an in-memory PKCS12 keystore
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ ks.load(null, null);
+ char[] emptyPassword = new char[0];
+ ks.setKeyEntry(
+ "client", privateKey, emptyPassword, new java.security.cert.Certificate[] {cert});
+
+ javax.net.ssl.KeyManagerFactory kmf =
+ javax.net.ssl.KeyManagerFactory.getInstance(
+ javax.net.ssl.KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, emptyPassword);
+ return kmf;
}
static String parseHost(String address) {
diff --git a/src/main/java/dev/faisca/fila/RpcException.java b/src/main/java/dev/faisca/fila/RpcException.java
index 55241c5..81c21a7 100644
--- a/src/main/java/dev/faisca/fila/RpcException.java
+++ b/src/main/java/dev/faisca/fila/RpcException.java
@@ -1,18 +1,27 @@
package dev.faisca.fila;
-import io.grpc.Status;
-
-/** Thrown for unexpected gRPC failures not mapped to a specific Fila exception. */
+/** Thrown for unexpected transport-level failures not mapped to a specific Fila exception. */
public class RpcException extends FilaException {
- private final Status.Code code;
- public RpcException(Status.Code code, String message) {
+ /** Status codes mirroring common error categories. */
+ public enum Code {
+ INTERNAL,
+ UNAUTHENTICATED,
+ PERMISSION_DENIED,
+ UNAVAILABLE,
+ CANCELLED,
+ UNKNOWN
+ }
+
+ private final Code code;
+
+ public RpcException(Code code, String message) {
super(message);
this.code = code;
}
- /** Returns the gRPC status code of the failed call. */
- public Status.Code getCode() {
+ /** Returns the status code of the failed call. */
+ public Code getCode() {
return code;
}
}
diff --git a/src/test/java/dev/faisca/fila/BuilderTest.java b/src/test/java/dev/faisca/fila/BuilderTest.java
index c28c562..e892493 100644
--- a/src/test/java/dev/faisca/fila/BuilderTest.java
+++ b/src/test/java/dev/faisca/fila/BuilderTest.java
@@ -4,66 +4,49 @@
import org.junit.jupiter.api.Test;
-/** Unit tests for FilaClient.Builder configuration. */
+/**
+ * Unit tests for FilaClient.Builder configuration.
+ *
+ * Tests that exercise the builder configuration itself (e.g. validation, chaining) do not
+ * require a running server. Tests that call {@code build()} on a valid configuration require a
+ * server at localhost:5555 and are guarded by {@code @EnabledIf("serverAvailable")}.
+ *
+ * The previous gRPC-based client deferred connection to the first RPC call, so {@code build()}
+ * always succeeded even with no server. FIBP connects eagerly during {@code build()}.
+ */
class BuilderTest {
- @Test
- void builderPlaintextDoesNotThrow() {
- // Plaintext builder should create a client without error (default AUTO batching)
- FilaClient client = FilaClient.builder("localhost:5555").build();
- assertNotNull(client);
- client.close();
+ static boolean serverAvailable() {
+ return TestServer.isBinaryAvailable();
}
- @Test
- void builderWithBatchDisabledDoesNotThrow() {
- // Plaintext builder with batching disabled
- FilaClient client =
- FilaClient.builder("localhost:5555").withBatchMode(BatchMode.disabled()).build();
- assertNotNull(client);
- client.close();
- }
+ // ── Configuration-only tests (no server needed) ───────────────────────────
@Test
- void builderWithBatchAutoDoesNotThrow() {
- // Explicit AUTO batch mode
- FilaClient client =
- FilaClient.builder("localhost:5555").withBatchMode(BatchMode.auto(50)).build();
- assertNotNull(client);
- client.close();
- }
-
- @Test
- void builderWithBatchLingerDoesNotThrow() {
- // LINGER batch mode
- FilaClient client =
- FilaClient.builder("localhost:5555").withBatchMode(BatchMode.linger(10, 50)).build();
- assertNotNull(client);
- client.close();
- }
-
- @Test
- void builderWithApiKeyDoesNotThrow() {
- // API key without TLS should work (for backward compat / dev mode)
- FilaClient client = FilaClient.builder("localhost:5555").withApiKey("test-key").build();
- assertNotNull(client);
- client.close();
+ void builderWithInvalidCaCertThrows() {
+ // Invalid PEM bytes should throw FilaException before any network attempt
+ assertThrows(
+ FilaException.class,
+ () ->
+ FilaClient.builder("localhost:5555")
+ .withTlsCaCert("not-a-valid-cert".getBytes())
+ .build());
}
@Test
- void builderWithInvalidCaCertThrows() {
- // Invalid PEM bytes should throw FilaException
+ void builderClientCertWithoutTlsThrows() {
+ // Client cert without TLS enabled should fail fast (no network attempt)
assertThrows(
FilaException.class,
() ->
FilaClient.builder("localhost:5555")
- .withTlsCaCert("not-a-valid-cert".getBytes())
+ .withTlsClientCert("cert".getBytes(), "key".getBytes())
.build());
}
@Test
void builderChainingReturnsBuilder() {
- // Verify fluent API returns the builder for chaining
+ // Verify fluent API returns the builder for chaining — no build() call
FilaClient.Builder builder =
FilaClient.builder("localhost:5555")
.withApiKey("key")
@@ -74,41 +57,47 @@ void builderChainingReturnsBuilder() {
}
@Test
- void builderClientCertWithoutTlsThrows() {
- // Client cert without TLS enabled should fail fast
- assertThrows(
- FilaException.class,
- () ->
- FilaClient.builder("localhost:5555")
- .withTlsClientCert("cert".getBytes(), "key".getBytes())
- .build());
+ void builderChainingWithTlsReturnsBuilder() {
+ // Verify fluent API for withTls() returns the builder for chaining — no build() call
+ FilaClient.Builder builder =
+ FilaClient.builder("localhost:5555")
+ .withTls()
+ .withApiKey("key")
+ .withTlsClientCert("cert".getBytes(), "key".getBytes());
+ assertNotNull(builder);
}
@Test
- void builderWithTlsSystemTrustDoesNotThrow() {
- // withTls() using system trust store should create a client without error
- FilaClient client = FilaClient.builder("localhost:5555").withTls().build();
- assertNotNull(client);
- client.close();
+ void parseHostPlaintext() {
+ assertEquals("localhost", FilaClient.Builder.parseHost("localhost:5555"));
}
@Test
- void builderWithTlsAndApiKeyDoesNotThrow() {
- // withTls() combined with API key should work
- FilaClient client =
- FilaClient.builder("localhost:5555").withTls().withApiKey("test-key").build();
- assertNotNull(client);
- client.close();
+ void parsePortPlaintext() {
+ assertEquals(5555, FilaClient.Builder.parsePort("localhost:5555"));
}
@Test
- void builderChainingWithTlsReturnsBuilder() {
- // Verify fluent API for withTls() returns the builder for chaining
- FilaClient.Builder builder =
- FilaClient.builder("localhost:5555")
- .withTls()
- .withApiKey("key")
- .withTlsClientCert("cert".getBytes(), "key".getBytes());
+ void parsePortDefaultsWhenMissing() {
+ assertEquals(5555, FilaClient.Builder.parsePort("localhost"));
+ }
+
+ @Test
+ void parseHostIpv6() {
+ assertEquals("::1", FilaClient.Builder.parseHost("[::1]:5555"));
+ }
+
+ @Test
+ void parsePortIpv6() {
+ assertEquals(5555, FilaClient.Builder.parsePort("[::1]:5555"));
+ }
+
+ // ── BatchMode config tests (no server needed) ─────────────────────────────
+
+ @Test
+ void batchModeAutoIsDefault() {
+ // Default batch mode should be AUTO — verified without connecting
+ FilaClient.Builder builder = FilaClient.builder("localhost:5555");
assertNotNull(builder);
}
}
diff --git a/src/test/java/dev/faisca/fila/FibpAdminClient.java b/src/test/java/dev/faisca/fila/FibpAdminClient.java
new file mode 100644
index 0000000..0ac10fe
--- /dev/null
+++ b/src/test/java/dev/faisca/fila/FibpAdminClient.java
@@ -0,0 +1,234 @@
+package dev.faisca.fila;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyStore;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * Minimal FIBP admin client for test infrastructure.
+ *
+ * Supports CreateQueue only. Admin operation payloads are protobuf-encoded (matching the
+ * server's fila-core admin dispatch). We hand-roll the minimal protobuf needed to avoid a test
+ * dependency on a protobuf runtime.
+ *
+ * Supports both plaintext and TLS/mTLS connections.
+ */
+final class FibpAdminClient implements AutoCloseable {
+
+ private static final byte[] HANDSHAKE_MAGIC = {'F', 'I', 'B', 'P', 0x01, 0x00};
+ private static final int FRAME_HEADER_BYTES = 6;
+
+ private final Socket socket;
+ private final DataInputStream in;
+ private final DataOutputStream out;
+ private int nextCorrId = 1;
+ private final Object writeLock = new Object();
+
+ private FibpAdminClient(Socket socket, DataInputStream in, DataOutputStream out) {
+ this.socket = socket;
+ this.in = in;
+ this.out = out;
+ }
+
+ /** Connect plaintext (no TLS). */
+ static FibpAdminClient connect(String host, int port, String apiKey) throws IOException {
+ Socket sock = new Socket(host, port);
+ return init(sock, apiKey);
+ }
+
+ /**
+ * Connect with TLS using a custom CA certificate and optional client cert/key for mTLS.
+ *
+ * @param caCertPem PEM-encoded CA certificate (required)
+ * @param clientCertPem PEM-encoded client certificate (optional, for mTLS)
+ * @param clientKeyPem PEM-encoded client private key (optional, for mTLS)
+ */
+ static FibpAdminClient connectTls(
+ String host,
+ int port,
+ String apiKey,
+ byte[] caCertPem,
+ byte[] clientCertPem,
+ byte[] clientKeyPem)
+ throws IOException {
+ try {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate caCert =
+ (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(caCertPem));
+
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("fila-ca", caCert);
+
+ TrustManagerFactory tmf =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(trustStore);
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+
+ if (clientCertPem != null && clientKeyPem != null) {
+ KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem);
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ } else {
+ sslContext.init(null, tmf.getTrustManagers(), null);
+ }
+
+ SSLSocket sslSocket = (SSLSocket) sslContext.getSocketFactory().createSocket(host, port);
+ sslSocket.setUseClientMode(true);
+ sslSocket.startHandshake();
+ return init(sslSocket, apiKey);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("TLS admin connection failed", e);
+ }
+ }
+
+ private static FibpAdminClient init(Socket sock, String apiKey) throws IOException {
+ sock.setTcpNoDelay(true);
+ DataInputStream in = new DataInputStream(sock.getInputStream());
+ DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+
+ // Handshake
+ out.write(HANDSHAKE_MAGIC);
+ out.flush();
+ byte[] echo = new byte[HANDSHAKE_MAGIC.length];
+ in.readFully(echo);
+ if (!Arrays.equals(echo, HANDSHAKE_MAGIC)) {
+ sock.close();
+ throw new IOException("FIBP admin handshake failed");
+ }
+
+ FibpAdminClient client = new FibpAdminClient(sock, in, out);
+
+ if (apiKey != null && !apiKey.isEmpty()) {
+ client.authenticate(apiKey);
+ }
+
+ return client;
+ }
+
+ void createQueue(String name) throws IOException {
+ byte[] payload = encodeCreateQueueRequest(name);
+ // If we get a response without IOException, the queue was created.
+ // Error frames cause sendRequest to throw.
+ sendRequest(FibpConnection.OP_CREATE_QUEUE, payload);
+ }
+
+ private void authenticate(String apiKey) throws IOException {
+ byte[] keyBytes = apiKey.getBytes(StandardCharsets.UTF_8);
+ sendRequest(FibpConnection.OP_AUTH, keyBytes);
+ }
+
+ /**
+ * Encode a CreateQueueRequest protobuf message with just the name field.
+ *
+ * Protobuf encoding: field 1 (name, string) = tag 0x0A (field=1, wire_type=2) + varint(len) +
+ * utf8 bytes.
+ */
+ private static byte[] encodeCreateQueueRequest(String name) {
+ byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8);
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ buf.write(0x0A); // field 1, wire type 2 (length-delimited)
+ writeVarint(buf, nameBytes.length);
+ buf.write(nameBytes, 0, nameBytes.length);
+ return buf.toByteArray();
+ }
+
+ private static void writeVarint(ByteArrayOutputStream buf, int value) {
+ while ((value & ~0x7F) != 0) {
+ buf.write((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ buf.write(value);
+ }
+
+ private static KeyManagerFactory buildKeyManagerFactory(byte[] certPem, byte[] keyPem)
+ throws Exception {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ java.security.cert.Certificate cert = cf.generateCertificate(new ByteArrayInputStream(certPem));
+
+ String keyStr = new String(keyPem, StandardCharsets.UTF_8);
+ String keyBase64 =
+ keyStr
+ .replaceAll("-----BEGIN.*?-----", "")
+ .replaceAll("-----END.*?-----", "")
+ .replaceAll("\\s", "");
+ byte[] keyBytes = java.util.Base64.getDecoder().decode(keyBase64);
+
+ java.security.PrivateKey privateKey;
+ java.security.KeyFactory kf;
+ try {
+ kf = java.security.KeyFactory.getInstance("EC");
+ privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes));
+ } catch (Exception e) {
+ kf = java.security.KeyFactory.getInstance("RSA");
+ privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes));
+ }
+
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ ks.load(null, null);
+ char[] emptyPassword = new char[0];
+ ks.setKeyEntry(
+ "client", privateKey, emptyPassword, new java.security.cert.Certificate[] {cert});
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, emptyPassword);
+ return kmf;
+ }
+
+ private byte[] sendRequest(byte op, byte[] payload) throws IOException {
+ int corrId;
+ synchronized (writeLock) {
+ corrId = nextCorrId++;
+ int bodyLen = FRAME_HEADER_BYTES + payload.length;
+ out.writeInt(bodyLen);
+ out.writeByte(0); // flags
+ out.writeByte(op);
+ out.writeInt(corrId);
+ out.write(payload);
+ out.flush();
+ }
+
+ // Read the response (simple blocking read — single-threaded admin client)
+ int bodyLen = in.readInt();
+ if (bodyLen < FRAME_HEADER_BYTES) {
+ throw new IOException("malformed response frame: bodyLen=" + bodyLen);
+ }
+ in.readByte(); // flags (not used in single-threaded client)
+ byte respOp = in.readByte();
+ in.readInt(); // corrId (not used in single-threaded client)
+ int respPayloadLen = bodyLen - FRAME_HEADER_BYTES;
+ byte[] respPayload = new byte[respPayloadLen];
+ if (respPayloadLen > 0) {
+ in.readFully(respPayload);
+ }
+
+ if (respOp == FibpConnection.OP_ERROR) {
+ String msg = new String(respPayload, StandardCharsets.UTF_8);
+ throw new IOException("server error: " + msg);
+ }
+
+ return respPayload;
+ }
+
+ @Override
+ public void close() {
+ try {
+ socket.close();
+ } catch (IOException ignored) {
+ }
+ }
+}
diff --git a/src/test/java/dev/faisca/fila/FilaClientTest.java b/src/test/java/dev/faisca/fila/FilaClientTest.java
index 1b1d523..0afa504 100644
--- a/src/test/java/dev/faisca/fila/FilaClientTest.java
+++ b/src/test/java/dev/faisca/fila/FilaClientTest.java
@@ -10,8 +10,20 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-
+import org.junit.jupiter.api.condition.EnabledIf;
+
+/**
+ * Integration tests for FilaClient core operations.
+ *
+ * Requires a fila-server binary. Skipped if not available.
+ */
+@EnabledIf("serverAvailable")
class FilaClientTest {
+
+ static boolean serverAvailable() {
+ return TestServer.isBinaryAvailable();
+ }
+
private static TestServer server;
private static FilaClient client;
diff --git a/src/test/java/dev/faisca/fila/TestServer.java b/src/test/java/dev/faisca/fila/TestServer.java
index 9cfb14d..2962872 100644
--- a/src/test/java/dev/faisca/fila/TestServer.java
+++ b/src/test/java/dev/faisca/fila/TestServer.java
@@ -1,13 +1,5 @@
package dev.faisca.fila;
-import fila.v1.Admin;
-import fila.v1.FilaAdminGrpc;
-import io.grpc.ChannelCredentials;
-import io.grpc.Grpc;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.TlsChannelCredentials;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Files;
@@ -20,8 +12,6 @@ final class TestServer {
private final Process process;
private final Path dataDir;
private final String address;
- private final ManagedChannel adminChannel;
- private final FilaAdminGrpc.FilaAdminBlockingStub adminStub;
private final boolean tlsEnabled;
private final byte[] caCertPem;
private final byte[] clientCertPem;
@@ -32,7 +22,6 @@ private TestServer(
Process process,
Path dataDir,
String address,
- ManagedChannel adminChannel,
boolean tlsEnabled,
byte[] caCertPem,
byte[] clientCertPem,
@@ -41,8 +30,6 @@ private TestServer(
this.process = process;
this.dataDir = dataDir;
this.address = address;
- this.adminChannel = adminChannel;
- this.adminStub = FilaAdminGrpc.newBlockingStub(adminChannel);
this.tlsEnabled = tlsEnabled;
this.caCertPem = caCertPem;
this.clientCertPem = clientCertPem;
@@ -80,25 +67,41 @@ String apiKey() {
return apiKey;
}
- /** Creates a queue on the test server (plaintext mode). */
+ /**
+ * Creates a queue on the test server using the fila CLI binary.
+ *
+ * Falls back to using the FIBP admin RPC directly if the CLI is not available.
+ */
void createQueue(String name) {
- adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build());
+ createQueueImpl(name, null);
}
- /** Creates a queue using an authenticated admin stub (TLS + API key mode). */
+ /** Creates a queue using an authenticated admin connection (TLS + API key mode). */
void createQueueWithApiKey(String name) {
- // The admin channel was already created with TLS + API key interceptor
- adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build());
+ createQueueImpl(name, apiKey);
}
- /** Stops the server and cleans up temporary files. */
- void stop() {
- adminChannel.shutdown();
+ private void createQueueImpl(String name, String key) {
+ // Use the FIBP admin protocol to create queues.
+ String host = address.split(":")[0];
+ int port = Integer.parseInt(address.split(":")[1]);
try {
- adminChannel.awaitTermination(2, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ FibpAdminClient admin;
+ if (tlsEnabled && caCertPem != null) {
+ admin = FibpAdminClient.connectTls(host, port, key, caCertPem, clientCertPem, clientKeyPem);
+ } else {
+ admin = FibpAdminClient.connect(host, port, key);
+ }
+ try (admin) {
+ admin.createQueue(name);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("failed to create queue '" + name + "'", e);
}
+ }
+
+ /** Stops the server and cleans up temporary files. */
+ void stop() {
process.destroyForcibly();
try {
process.waitFor(5, TimeUnit.SECONDS);
@@ -146,8 +149,7 @@ static TestServer start() throws IOException, InterruptedException {
throw new IOException("fila-server failed to start within 10s on " + address);
}
- ManagedChannel adminChannel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
- return new TestServer(process, dataDir, address, adminChannel, false, null, null, null, null);
+ return new TestServer(process, dataDir, address, false, null, null, null, null);
}
/** Starts a fila-server with TLS and API key auth on a random port. */
@@ -203,20 +205,8 @@ static TestServer startWithTls() throws IOException, InterruptedException {
throw new IOException("fila-server failed to start within 10s on " + address);
}
- // Create admin channel with TLS + API key
- TlsChannelCredentials.Builder tlsBuilder =
- TlsChannelCredentials.newBuilder().trustManager(new ByteArrayInputStream(caCert));
- tlsBuilder.keyManager(
- new ByteArrayInputStream(clientCert), new ByteArrayInputStream(clientKey));
- ChannelCredentials creds = tlsBuilder.build();
-
- ManagedChannel adminChannel =
- Grpc.newChannelBuilderForAddress("127.0.0.1", port, creds)
- .intercept(new ApiKeyInterceptor(bootstrapKey))
- .build();
-
return new TestServer(
- process, dataDir, address, adminChannel, true, caCert, clientCert, clientKey, bootstrapKey);
+ process, dataDir, address, true, caCert, clientCert, clientKey, bootstrapKey);
}
private static void generateCerts(Path dir) throws IOException, InterruptedException {
@@ -334,7 +324,7 @@ private static void exec(Path workDir, String... cmd) throws IOException, Interr
}
}
- private static String findBinary() {
+ static String findBinary() {
Path devPath =
Path.of(System.getProperty("user.dir")).resolve("../fila/target/release/fila-server");
if (Files.isExecutable(devPath)) {
diff --git a/src/test/java/dev/faisca/fila/TlsAuthClientTest.java b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java
index ac3560b..61874d9 100644
--- a/src/test/java/dev/faisca/fila/TlsAuthClientTest.java
+++ b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java
@@ -73,20 +73,20 @@ void connectWithTlsAndApiKey() throws Exception {
@Test
void connectWithTlsOnly() throws Exception {
- // TLS without API key — validates TLS transport works independently of auth
+ // TLS without API key — validates TLS transport works independently of auth.
+ // Without an API key on an auth-enabled server, the enqueue should be rejected.
+ // This validates TLS transport is working (connection succeeds) but auth is enforced.
try (FilaClient client =
FilaClient.builder(server.address())
.withTlsCaCert(server.caCertPem())
.withTlsClientCert(server.clientCertPem(), server.clientKeyPem())
.build()) {
- // Without an API key on an auth-enabled server, the enqueue should be rejected.
- // This validates TLS transport is working (connection succeeds) but auth is enforced.
RpcException ex =
assertThrows(
RpcException.class,
() -> client.enqueue("test-tls-auth", Map.of(), "tls-only".getBytes()));
assertEquals(
- io.grpc.Status.Code.UNAUTHENTICATED,
+ RpcException.Code.UNAUTHENTICATED,
ex.getCode(),
"should reject with UNAUTHENTICATED when no API key is provided");
}
@@ -104,7 +104,7 @@ void rejectWithoutApiKey() {
RpcException.class,
() -> client.enqueue("test-tls-auth", Map.of(), "no-key".getBytes()));
assertEquals(
- io.grpc.Status.Code.UNAUTHENTICATED,
+ RpcException.Code.UNAUTHENTICATED,
ex.getCode(),
"should reject with UNAUTHENTICATED when no API key is provided");
}
+ * Enqueue request:
+ * queue_len:u16 | queue:utf8 | msg_count:u16 | messages...
+ * Each message: header_count:u8 | (key:str16 val:str16)* | payload_len:u32 | payload
+ *
+ * Enqueue response:
+ * count:u16 | results...
+ * ok=1: 0x01 | msg_id:str16
+ * ok=0: 0x00 | err_code:u16 | err_msg:str16
+ *
+ * Consume request:
+ * queue:str16 | initial_credits:u32
+ *
+ * Consume push (server → client):
+ * count:u16 | messages...
+ * Each: msg_id:str16 | fairness_key:str16 | attempt_count:u32 |
+ * header_count:u8 | (key:str16 val:str16)* | payload_len:u32 | payload
+ *
+ * Ack request:
+ * count:u16 | items: (queue:str16 msg_id:str16)*
+ *
+ * Nack request:
+ * count:u16 | items: (queue:str16 msg_id:str16 error:str16)*
+ *
+ * Ack/Nack response:
+ * count:u16 | results: (0x01 | 0x00 err_code:u16 err_msg:str16)*
+ *
+ */
+final class FibpCodec {
+
+ // Enqueue error codes
+ static final int ENQUEUE_ERR_QUEUE_NOT_FOUND = 1;
+
+ // Ack/Nack error codes
+ static final int ACK_NACK_ERR_MESSAGE_NOT_FOUND = 1;
+
+ private FibpCodec() {}
+
+ // ── Enqueue ───────────────────────────────────────────────────────────────
+
+ /**
+ * Encode an enqueue request. All messages must target the same queue (the first message's queue
+ * name is used).
+ */
+ static byte[] encodeEnqueue(List
+ * count:u16 | (0x01 msg_id:str16 | 0x00 err_code:u16 err_msg:str16)*
+ *
+ */
+ static List
+ * queue:str16 | initial_credits:u32
+ *
+ */
+ static byte[] encodeConsume(String queue, int initialCredits) {
+ try (ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(buf)) {
+ writeStr16(dos, queue);
+ dos.writeInt(initialCredits);
+ dos.flush();
+ return buf.toByteArray();
+ } catch (IOException e) {
+ throw new FilaException("encode consume failed", e);
+ }
+ }
+
+ /**
+ * Decode a batch of server-pushed consume messages.
+ *
+ *
+ * count:u16 | (msg_id:str16 fairness_key:str16 attempt_count:u32
+ * header_count:u8 (key:str16 val:str16)* payload_len:u32 payload)*
+ *
+ *
+ * @param queue the queue name (not in the wire format; supplied by the caller)
+ */
+ static List
+ * count:u16 | (queue:str16 msg_id:str16)*
+ *
+ */
+ static byte[] encodeAck(String queue, String msgId) {
+ try (ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(buf)) {
+ dos.writeShort(1);
+ writeStr16(dos, queue);
+ writeStr16(dos, msgId);
+ dos.flush();
+ return buf.toByteArray();
+ } catch (IOException e) {
+ throw new FilaException("encode ack failed", e);
+ }
+ }
+
+ // ── Nack ──────────────────────────────────────────────────────────────────
+
+ /**
+ * Encode a nack request.
+ *
+ *
+ * count:u16 | (queue:str16 msg_id:str16 error:str16)*
+ *
+ */
+ static byte[] encodeNack(String queue, String msgId, String error) {
+ try (ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(buf)) {
+ dos.writeShort(1);
+ writeStr16(dos, queue);
+ writeStr16(dos, msgId);
+ writeStr16(dos, error);
+ dos.flush();
+ return buf.toByteArray();
+ } catch (IOException e) {
+ throw new FilaException("encode nack failed", e);
+ }
+ }
+
+ // ── Ack/Nack response ─────────────────────────────────────────────────────
+
+ /**
+ * Decode an ack or nack response and throw if the single item failed.
+ *
+ *
+ * count:u16 | (0x01 | 0x00 err_code:u16 err_msg:str16)*
+ *
+ */
+ static void decodeAckNackResponse(byte[] payload, boolean isAck) {
+ int pos = 0;
+ int count = readU16(payload, pos);
+ pos += 2;
+ if (count < 1) {
+ throw new RpcException(RpcException.Code.INTERNAL, "no result from server");
+ }
+ int tag = payload[pos++] & 0xFF;
+ if (tag != 1) {
+ int errCode = readU16(payload, pos);
+ pos += 2;
+ int errLen = readU16(payload, pos);
+ pos += 2;
+ String msg = new String(payload, pos, errLen, StandardCharsets.UTF_8);
+ throw decodeAckNackError(errCode, msg, isAck);
+ }
+ }
+
+ private static FilaException decodeAckNackError(int errCode, String msg, boolean isAck) {
+ String prefix = isAck ? "ack: " : "nack: ";
+ return switch (errCode) {
+ case ACK_NACK_ERR_MESSAGE_NOT_FOUND -> new MessageNotFoundException(prefix + msg);
+ default -> new RpcException(RpcException.Code.INTERNAL, msg);
+ };
+ }
+
+ // ── Error frame ───────────────────────────────────────────────────────────
+
+ /**
+ * Decode an ERROR frame payload (raw UTF-8 string) into a FilaException.
+ *
+ *