diff --git a/README.md b/README.md index 67d97ef..5844151 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,77 @@ try (FilaClient client = FilaClient.builder("localhost:5555").build()) { } ``` +## TLS + +### System trust store (public CAs) + +If the Fila server uses a certificate issued by a public CA (e.g., Let's Encrypt), enable TLS with the JVM's default trust store: + +```java +try (FilaClient client = FilaClient.builder("localhost:5555") + .withTls() + .build()) { + // use client... +} +``` + +### Custom CA certificate + +For servers using self-signed or private CA certificates, provide the CA cert explicitly: + +```java +byte[] caCert = Files.readAllBytes(Path.of("ca.pem")); + +try (FilaClient client = FilaClient.builder("localhost:5555") + .withTlsCaCert(caCert) + .build()) { + // use client... +} +``` + +### Mutual TLS (mTLS) + +For mutual TLS, also provide the client certificate and key. This works with both trust modes: + +```java +byte[] caCert = Files.readAllBytes(Path.of("ca.pem")); +byte[] clientCert = Files.readAllBytes(Path.of("client.pem")); +byte[] clientKey = Files.readAllBytes(Path.of("client-key.pem")); + +try (FilaClient client = FilaClient.builder("localhost:5555") + .withTlsCaCert(caCert) + .withTlsClientCert(clientCert, clientKey) + .build()) { + // use client... +} +``` + +## API Key Authentication + +When the server has auth enabled, provide an API key: + +```java +try (FilaClient client = FilaClient.builder("localhost:5555") + .withApiKey("your-api-key") + .build()) { + // use client... +} +``` + +The key is sent as a `Bearer` token in the `authorization` metadata header on every RPC. + +TLS and API key auth can be combined: + +```java +try (FilaClient client = FilaClient.builder("localhost:5555") + .withTlsCaCert(caCert) + .withTlsClientCert(clientCert, clientKey) + .withApiKey("your-api-key") + .build()) { + // use client... +} +``` + ## API Reference ### `FilaClient` @@ -56,6 +127,17 @@ FilaClient client = FilaClient.builder("localhost:5555").build(); `FilaClient` implements `AutoCloseable` for use with try-with-resources. +#### Builder Methods + +| Method | Description | +|--------|-------------| +| `withTls()` | Enable TLS using JVM's default trust store (cacerts) | +| `withTlsCaCert(byte[] caCertPem)` | CA certificate for TLS server verification (implies `withTls()`) | +| `withTlsClientCert(byte[] certPem, byte[] keyPem)` | Client cert + key for mTLS | +| `withApiKey(String apiKey)` | API key sent as `Bearer` token on every RPC | + +All builder methods are optional. When none are set, the client connects over plaintext without authentication (backward compatible). + #### `enqueue(String queue, Map headers, byte[] payload) -> String` Enqueue a message. Returns the broker-assigned message ID (UUIDv7). diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto index bf9d5ca..886e58d 100644 --- a/proto/fila/v1/admin.proto +++ b/proto/fila/v1/admin.proto @@ -11,6 +11,15 @@ service FilaAdmin { rpc GetStats(GetStatsRequest) returns (GetStatsResponse); rpc Redrive(RedriveRequest) returns (RedriveResponse); rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); + + // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. + rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); + rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); + rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); + + // Per-key ACL management. + rpc SetAcl(SetAclRequest) returns (SetAclResponse); + rpc GetAcl(GetAclRequest) returns (GetAclResponse); } message CreateQueueRequest { @@ -89,6 +98,9 @@ message GetStatsResponse { uint32 quantum = 5; repeated PerFairnessKeyStats per_key_stats = 6; repeated PerThrottleKeyStats per_throttle_stats = 7; + // Cluster fields (0 when not in cluster mode). + uint64 leader_node_id = 8; + uint32 replication_count = 9; } message RedriveRequest { @@ -107,8 +119,79 @@ message QueueInfo { uint64 depth = 2; uint64 in_flight = 3; uint32 active_consumers = 4; + uint64 leader_node_id = 5; } message ListQueuesResponse { repeated QueueInfo queues = 1; + uint32 cluster_node_count = 2; +} + +// --- API Key Management --- + +message CreateApiKeyRequest { + /// Human-readable label for the key. + string name = 1; + /// Optional Unix timestamp (milliseconds) after which the key expires. + /// 0 means no expiration. + uint64 expires_at_ms = 2; + /// When true, the key bypasses all ACL checks (superadmin). + bool is_superadmin = 3; +} + +message CreateApiKeyResponse { + /// Opaque key ID for management operations (revoke, list, set-acl). + string key_id = 1; + /// Plaintext API key. Returned once — store it securely. + string key = 2; + /// Whether this key has superadmin privileges. + bool is_superadmin = 3; +} + +message RevokeApiKeyRequest { + string key_id = 1; +} + +message RevokeApiKeyResponse {} + +message ListApiKeysRequest {} + +message ApiKeyInfo { + string key_id = 1; + string name = 2; + uint64 created_at_ms = 3; + /// 0 means no expiration. + uint64 expires_at_ms = 4; + bool is_superadmin = 5; +} + +message ListApiKeysResponse { + repeated ApiKeyInfo keys = 1; +} + +// --- ACL Management --- + +/// A single permission grant: kind (produce/consume/admin) + queue pattern. +message AclPermission { + /// One of: "produce", "consume", "admin". + string kind = 1; + /// Queue name or wildcard ("*" or "orders.*"). + string pattern = 2; +} + +message SetAclRequest { + string key_id = 1; + repeated AclPermission permissions = 2; +} + +message SetAclResponse {} + +message GetAclRequest { + string key_id = 1; +} + +message GetAclResponse { + string key_id = 1; + repeated AclPermission permissions = 2; + bool is_superadmin = 3; } diff --git a/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java b/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java new file mode 100644 index 0000000..e7ea461 --- /dev/null +++ b/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java @@ -0,0 +1,36 @@ +package dev.faisca.fila; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; + +/** + * gRPC client interceptor that attaches a {@code Bearer} API key to the {@code authorization} + * metadata header on every outgoing RPC. + */ +final class ApiKeyInterceptor implements ClientInterceptor { + private static final Metadata.Key AUTH_KEY = + Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER); + + private final String headerValue; + + ApiKeyInterceptor(String apiKey) { + this.headerValue = "Bearer " + apiKey; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall<>(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(AUTH_KEY, headerValue); + super.start(responseListener, headers); + } + }; + } +} diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java index f7204f0..8551fc0 100644 --- a/src/main/java/dev/faisca/fila/FilaClient.java +++ b/src/main/java/dev/faisca/fila/FilaClient.java @@ -3,10 +3,15 @@ import fila.v1.FilaServiceGrpc; import fila.v1.Messages; import fila.v1.Service; +import io.grpc.ChannelCredentials; import io.grpc.Context; +import io.grpc.Grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; +import io.grpc.TlsChannelCredentials; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -208,15 +213,159 @@ private static FilaException mapNackError(StatusRuntimeException e) { /** Builder for {@link FilaClient}. */ public static final class Builder { private final String address; + private boolean tlsEnabled; + private byte[] caCertPem; + private byte[] clientCertPem; + private byte[] clientKeyPem; + private String apiKey; private Builder(String address) { this.address = address; } + /** + * Enable TLS using the JVM's default trust store (cacerts). + * + *

Use this when the Fila server's certificate is issued by a public CA already trusted by + * the JVM. For servers using self-signed or private CA certificates, use {@link + * #withTlsCaCert(byte[])} instead. + * + * @return this builder + */ + public Builder withTls() { + this.tlsEnabled = true; + return this; + } + + /** + * Set the CA certificate for TLS server verification. + * + *

When set, the client connects over TLS instead of plaintext. The CA certificate is used to + * verify the server's identity. Implies {@link #withTls()}. + * + * @param caCertPem PEM-encoded CA certificate bytes + * @return this builder + */ + public Builder withTlsCaCert(byte[] caCertPem) { + this.caCertPem = caCertPem; + this.tlsEnabled = true; + return this; + } + + /** + * Set the client certificate and key for mutual TLS (mTLS). + * + *

Requires either {@link #withTls()} or {@link #withTlsCaCert(byte[])} to be called first. + * When provided, the client presents its certificate to the server for mutual authentication. + * + * @param certPem PEM-encoded client certificate bytes + * @param keyPem PEM-encoded client private key bytes + * @return this builder + */ + public Builder withTlsClientCert(byte[] certPem, byte[] keyPem) { + this.clientCertPem = certPem; + this.clientKeyPem = keyPem; + return this; + } + + /** + * Set an API key for authentication. + * + *

When set, the key is sent as a {@code Bearer} token in the {@code authorization} metadata + * header on every outgoing RPC. + * + * @param apiKey the API key string + * @return this builder + */ + public Builder withApiKey(String apiKey) { + this.apiKey = apiKey; + return this; + } + /** Build and connect the client. */ public FilaClient build() { - ManagedChannel channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); + if (clientCertPem != null && !tlsEnabled) { + throw new FilaException( + "client certificate requires TLS — call withTls() or withTlsCaCert() first"); + } + + ManagedChannel channel; + + if (tlsEnabled) { + // Parse host/port before the TLS try block so that NumberFormatException + // (a subclass of IllegalArgumentException) from address parsing is not + // misreported as "invalid certificate". + String host = parseHost(address); + int port = parsePort(address); + + try { + TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder(); + + if (caCertPem != null) { + tlsBuilder.trustManager(new ByteArrayInputStream(caCertPem)); + } + + if (clientCertPem != null && clientKeyPem != null) { + tlsBuilder.keyManager( + new ByteArrayInputStream(clientCertPem), new ByteArrayInputStream(clientKeyPem)); + } + + ChannelCredentials creds = tlsBuilder.build(); + var channelBuilder = Grpc.newChannelBuilderForAddress(host, port, creds); + + if (apiKey != null) { + channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); + } + + channel = channelBuilder.build(); + } catch (IllegalArgumentException e) { + throw new FilaException("failed to configure TLS: invalid certificate", e); + } catch (IOException e) { + throw new FilaException("failed to configure TLS", e); + } + } else { + var channelBuilder = ManagedChannelBuilder.forTarget(address).usePlaintext(); + + if (apiKey != null) { + channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); + } + + channel = channelBuilder.build(); + } + return new FilaClient(channel); } + + private static String parseHost(String address) { + // Handle IPv6 bracket notation: [::1]:5555 + if (address.startsWith("[")) { + int closeBracket = address.indexOf(']'); + if (closeBracket < 0) { + return address; + } + return address.substring(1, closeBracket); + } + int colonIdx = address.lastIndexOf(':'); + if (colonIdx < 0) { + return address; + } + return address.substring(0, colonIdx); + } + + private static int parsePort(String address) { + // Handle IPv6 bracket notation: [::1]:5555 + if (address.startsWith("[")) { + int closeBracket = address.indexOf(']'); + if (closeBracket < 0 || closeBracket + 2 > address.length()) { + return 5555; + } + return Integer.parseInt(address.substring(closeBracket + 2)); + } + int colonIdx = address.lastIndexOf(':'); + if (colonIdx < 0) { + return 5555; + } + return Integer.parseInt(address.substring(colonIdx + 1)); + } } } diff --git a/src/test/java/dev/faisca/fila/BuilderTest.java b/src/test/java/dev/faisca/fila/BuilderTest.java new file mode 100644 index 0000000..b144878 --- /dev/null +++ b/src/test/java/dev/faisca/fila/BuilderTest.java @@ -0,0 +1,86 @@ +package dev.faisca.fila; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +/** Unit tests for FilaClient.Builder configuration. */ +class BuilderTest { + + @Test + void builderPlaintextDoesNotThrow() { + // Plaintext builder should create a client without error + FilaClient client = FilaClient.builder("localhost:5555").build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderWithApiKeyDoesNotThrow() { + // API key without TLS should work (for backward compat / dev mode) + FilaClient client = FilaClient.builder("localhost:5555").withApiKey("test-key").build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderWithInvalidCaCertThrows() { + // Invalid PEM bytes should throw FilaException + assertThrows( + FilaException.class, + () -> + FilaClient.builder("localhost:5555") + .withTlsCaCert("not-a-valid-cert".getBytes()) + .build()); + } + + @Test + void builderChainingReturnsBuilder() { + // Verify fluent API returns the builder for chaining + FilaClient.Builder builder = + FilaClient.builder("localhost:5555") + .withApiKey("key") + .withTlsCaCert("cert".getBytes()) + .withTlsClientCert("cert".getBytes(), "key".getBytes()); + assertNotNull(builder); + } + + @Test + void builderClientCertWithoutTlsThrows() { + // Client cert without TLS enabled should fail fast + assertThrows( + FilaException.class, + () -> + FilaClient.builder("localhost:5555") + .withTlsClientCert("cert".getBytes(), "key".getBytes()) + .build()); + } + + @Test + void builderWithTlsSystemTrustDoesNotThrow() { + // withTls() using system trust store should create a client without error + FilaClient client = FilaClient.builder("localhost:5555").withTls().build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderWithTlsAndApiKeyDoesNotThrow() { + // withTls() combined with API key should work + FilaClient client = + FilaClient.builder("localhost:5555").withTls().withApiKey("test-key").build(); + assertNotNull(client); + client.close(); + } + + @Test + void builderChainingWithTlsReturnsBuilder() { + // Verify fluent API for withTls() returns the builder for chaining + FilaClient.Builder builder = + FilaClient.builder("localhost:5555") + .withTls() + .withApiKey("key") + .withTlsClientCert("cert".getBytes(), "key".getBytes()); + assertNotNull(builder); + } +} diff --git a/src/test/java/dev/faisca/fila/TestServer.java b/src/test/java/dev/faisca/fila/TestServer.java index c3c99e7..9cfb14d 100644 --- a/src/test/java/dev/faisca/fila/TestServer.java +++ b/src/test/java/dev/faisca/fila/TestServer.java @@ -2,8 +2,12 @@ import fila.v1.Admin; import fila.v1.FilaAdminGrpc; +import io.grpc.ChannelCredentials; +import io.grpc.Grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.TlsChannelCredentials; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.ServerSocket; import java.nio.file.Files; @@ -18,13 +22,32 @@ final class TestServer { private final String address; private final ManagedChannel adminChannel; private final FilaAdminGrpc.FilaAdminBlockingStub adminStub; + private final boolean tlsEnabled; + private final byte[] caCertPem; + private final byte[] clientCertPem; + private final byte[] clientKeyPem; + private final String apiKey; - private TestServer(Process process, Path dataDir, String address, ManagedChannel adminChannel) { + private TestServer( + Process process, + Path dataDir, + String address, + ManagedChannel adminChannel, + boolean tlsEnabled, + byte[] caCertPem, + byte[] clientCertPem, + byte[] clientKeyPem, + String apiKey) { this.process = process; this.dataDir = dataDir; this.address = address; this.adminChannel = adminChannel; this.adminStub = FilaAdminGrpc.newBlockingStub(adminChannel); + this.tlsEnabled = tlsEnabled; + this.caCertPem = caCertPem; + this.clientCertPem = clientCertPem; + this.clientKeyPem = clientKeyPem; + this.apiKey = apiKey; } /** Returns the address of the running server. */ @@ -32,11 +55,42 @@ String address() { return address; } - /** Creates a queue on the test server. */ + /** Returns true if TLS is enabled on this server. */ + boolean isTlsEnabled() { + return tlsEnabled; + } + + /** Returns the CA certificate PEM bytes. Only valid when TLS is enabled. */ + byte[] caCertPem() { + return caCertPem; + } + + /** Returns the client certificate PEM bytes. Only valid when TLS is enabled. */ + byte[] clientCertPem() { + return clientCertPem; + } + + /** Returns the client private key PEM bytes. Only valid when TLS is enabled. */ + byte[] clientKeyPem() { + return clientKeyPem; + } + + /** Returns the bootstrap API key. Only valid when auth is enabled. */ + String apiKey() { + return apiKey; + } + + /** Creates a queue on the test server (plaintext mode). */ void createQueue(String name) { adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build()); } + /** Creates a queue using an authenticated admin stub (TLS + API key mode). */ + void createQueueWithApiKey(String name) { + // The admin channel was already created with TLS + API key interceptor + adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build()); + } + /** Stops the server and cleans up temporary files. */ void stop() { adminChannel.shutdown(); @@ -54,7 +108,24 @@ void stop() { deleteDirectory(dataDir); } - /** Starts a fila-server on a random port. */ + /** + * Returns true if the fila-server binary is available at a known local path. + * + *

Note: This intentionally does NOT check PATH. The TLS integration tests require a local dev + * build to ensure cert generation and server config are compatible. In CI, the plaintext + * integration tests run via {@link FilaClientTest} using the downloaded binary; the TLS tests are + * skipped until the CI pipeline is configured to provision TLS test infrastructure. + */ + static boolean isBinaryAvailable() { + try { + String path = findBinary(); + return path != null && Files.isExecutable(Path.of(path)); + } catch (Exception e) { + return false; + } + } + + /** Starts a fila-server on a random port (plaintext, no auth). */ static TestServer start() throws IOException, InterruptedException { int port = findFreePort(); String address = "127.0.0.1:" + port; @@ -76,7 +147,191 @@ static TestServer start() throws IOException, InterruptedException { } ManagedChannel adminChannel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); - return new TestServer(process, dataDir, address, adminChannel); + return new TestServer(process, dataDir, address, adminChannel, false, null, null, null, null); + } + + /** Starts a fila-server with TLS and API key auth on a random port. */ + static TestServer startWithTls() throws IOException, InterruptedException { + int port = findFreePort(); + String address = "127.0.0.1:" + port; + + Path dataDir = Files.createTempDirectory("fila-test-tls-"); + + // Generate self-signed CA, server cert, and client cert using openssl + generateCerts(dataDir); + + byte[] caCert = Files.readAllBytes(dataDir.resolve("ca.pem")); + byte[] clientCert = Files.readAllBytes(dataDir.resolve("client.pem")); + byte[] clientKey = Files.readAllBytes(dataDir.resolve("client-key.pem")); + + // Bootstrap API key for auth + String bootstrapKey = "test-bootstrap-key-" + System.currentTimeMillis(); + + Path configFile = dataDir.resolve("fila.toml"); + String config = + "[server]\n" + + "listen_addr = \"" + + address + + "\"\n" + + "\n" + + "[tls]\n" + + "ca_cert = \"" + + dataDir.resolve("ca.pem") + + "\"\n" + + "server_cert = \"" + + dataDir.resolve("server.pem") + + "\"\n" + + "server_key = \"" + + dataDir.resolve("server-key.pem") + + "\"\n" + + "\n" + + "[auth]\n" + + "bootstrap_apikey = \"" + + bootstrapKey + + "\"\n"; + Files.writeString(configFile, config); + + String binaryPath = findBinary(); + ProcessBuilder pb = new ProcessBuilder(binaryPath).redirectErrorStream(true); + pb.directory(dataDir.toFile()); + pb.environment().put("FILA_DATA_DIR", dataDir.resolve("db").toString()); + Process process = pb.start(); + + if (!waitForPort(port, 10_000)) { + process.destroyForcibly(); + deleteDirectory(dataDir); + throw new IOException("fila-server failed to start within 10s on " + address); + } + + // Create admin channel with TLS + API key + TlsChannelCredentials.Builder tlsBuilder = + TlsChannelCredentials.newBuilder().trustManager(new ByteArrayInputStream(caCert)); + tlsBuilder.keyManager( + new ByteArrayInputStream(clientCert), new ByteArrayInputStream(clientKey)); + ChannelCredentials creds = tlsBuilder.build(); + + ManagedChannel adminChannel = + Grpc.newChannelBuilderForAddress("127.0.0.1", port, creds) + .intercept(new ApiKeyInterceptor(bootstrapKey)) + .build(); + + return new TestServer( + process, dataDir, address, adminChannel, true, caCert, clientCert, clientKey, bootstrapKey); + } + + private static void generateCerts(Path dir) throws IOException, InterruptedException { + // Generate CA key and cert + exec( + dir, + "openssl", + "req", + "-x509", + "-newkey", + "ec", + "-pkeyopt", + "ec_paramgen_curve:prime256v1", + "-keyout", + "ca-key.pem", + "-out", + "ca.pem", + "-days", + "1", + "-nodes", + "-subj", + "/CN=fila-test-ca"); + + // Generate server key and CSR + exec( + dir, + "openssl", + "req", + "-newkey", + "ec", + "-pkeyopt", + "ec_paramgen_curve:prime256v1", + "-keyout", + "server-key.pem", + "-out", + "server.csr", + "-nodes", + "-subj", + "/CN=127.0.0.1"); + + // Write SAN extension file + Files.writeString( + dir.resolve("server-ext.cnf"), "subjectAltName=IP:127.0.0.1\nbasicConstraints=CA:FALSE\n"); + + // Sign server cert with CA + exec( + dir, + "openssl", + "x509", + "-req", + "-in", + "server.csr", + "-CA", + "ca.pem", + "-CAkey", + "ca-key.pem", + "-CAcreateserial", + "-out", + "server.pem", + "-days", + "1", + "-extfile", + "server-ext.cnf"); + + // Generate client key and CSR + exec( + dir, + "openssl", + "req", + "-newkey", + "ec", + "-pkeyopt", + "ec_paramgen_curve:prime256v1", + "-keyout", + "client-key.pem", + "-out", + "client.csr", + "-nodes", + "-subj", + "/CN=fila-test-client"); + + // Sign client cert with CA + exec( + dir, + "openssl", + "x509", + "-req", + "-in", + "client.csr", + "-CA", + "ca.pem", + "-CAkey", + "ca-key.pem", + "-CAcreateserial", + "-out", + "client.pem", + "-days", + "1"); + } + + private static void exec(Path workDir, String... cmd) throws IOException, InterruptedException { + ProcessBuilder pb = + new ProcessBuilder(cmd).directory(workDir.toFile()).redirectErrorStream(true); + Process p = pb.start(); + byte[] output = p.getInputStream().readAllBytes(); + int exitCode = p.waitFor(); + if (exitCode != 0) { + throw new IOException( + "Command failed: " + + String.join(" ", cmd) + + "\nExit code: " + + exitCode + + "\nOutput: " + + new String(output)); + } } private static String findBinary() { diff --git a/src/test/java/dev/faisca/fila/TlsAuthClientTest.java b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java new file mode 100644 index 0000000..ac3560b --- /dev/null +++ b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java @@ -0,0 +1,112 @@ +package dev.faisca.fila; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; + +/** + * Integration tests for TLS and API key authentication. + * + *

These tests require a fila-server running with TLS and auth enabled. They are skipped if the + * server binary is not available. + */ +@EnabledIf("serverAvailable") +class TlsAuthClientTest { + private static TestServer server; + + @BeforeAll + static void setUp() throws Exception { + server = TestServer.startWithTls(); + server.createQueueWithApiKey("test-tls-auth"); + } + + @AfterAll + static void tearDown() { + if (server != null) server.stop(); + } + + static boolean serverAvailable() { + return TestServer.isBinaryAvailable(); + } + + @Test + void connectWithTlsAndApiKey() throws Exception { + try (FilaClient client = + FilaClient.builder(server.address()) + .withTlsCaCert(server.caCertPem()) + .withTlsClientCert(server.clientCertPem(), server.clientKeyPem()) + .withApiKey(server.apiKey()) + .build()) { + String msgId = + client.enqueue("test-tls-auth", Map.of("secure", "true"), "tls payload".getBytes()); + assertNotNull(msgId); + assertFalse(msgId.isEmpty()); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference received = new AtomicReference<>(); + + ConsumerHandle handle = + client.consume( + "test-tls-auth", + msg -> { + received.set(msg); + client.ack("test-tls-auth", msg.getId()); + latch.countDown(); + }); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should receive message within 10s"); + handle.cancel(); + + ConsumeMessage msg = received.get(); + assertNotNull(msg); + assertEquals(msgId, msg.getId()); + assertArrayEquals("tls payload".getBytes(), msg.getPayload()); + } + } + + @Test + void connectWithTlsOnly() throws Exception { + // TLS without API key — validates TLS transport works independently of auth + try (FilaClient client = + FilaClient.builder(server.address()) + .withTlsCaCert(server.caCertPem()) + .withTlsClientCert(server.clientCertPem(), server.clientKeyPem()) + .build()) { + // Without an API key on an auth-enabled server, the enqueue should be rejected. + // This validates TLS transport is working (connection succeeds) but auth is enforced. + RpcException ex = + assertThrows( + RpcException.class, + () -> client.enqueue("test-tls-auth", Map.of(), "tls-only".getBytes())); + assertEquals( + io.grpc.Status.Code.UNAUTHENTICATED, + ex.getCode(), + "should reject with UNAUTHENTICATED when no API key is provided"); + } + } + + @Test + void rejectWithoutApiKey() { + try (FilaClient client = + FilaClient.builder(server.address()) + .withTlsCaCert(server.caCertPem()) + .withTlsClientCert(server.clientCertPem(), server.clientKeyPem()) + .build()) { + RpcException ex = + assertThrows( + RpcException.class, + () -> client.enqueue("test-tls-auth", Map.of(), "no-key".getBytes())); + assertEquals( + io.grpc.Status.Code.UNAUTHENTICATED, + ex.getCode(), + "should reject with UNAUTHENTICATED when no API key is provided"); + } + } +}