diff --git a/documents/configuration/ojp-jdbc-configuration.md b/documents/configuration/ojp-jdbc-configuration.md index df15af7de..b46a6e9ba 100644 --- a/documents/configuration/ojp-jdbc-configuration.md +++ b/documents/configuration/ojp-jdbc-configuration.md @@ -791,6 +791,98 @@ The multi-datasource feature is fully backward compatible: - Existing `ojp.properties` files without datasource prefixes continue to work as "default" datasource - No changes required for existing deployments unless you want to use multi-datasource features +## Read/Write Splitting Configuration + +OJP supports automatic read/write traffic splitting. Write operations (INSERT, UPDATE, DELETE, DDL) are always routed to the primary datasource. Stateless (auto-commit) read operations (SELECT, WITH, EXPLAIN, SHOW, DESCRIBE) are routed to a replica datasource when read/write splitting is enabled. All operations inside an explicit transaction are always routed to the primary. + +Read/write splitting is configured entirely through `ojp.properties`. No server-side configuration changes are needed. + +> **Note:** Sticky sessions are **opt-in** (default: `0` — disabled). Do not set a non-zero `stickySessionSeconds` unless your application requires read-your-writes guarantees after writes outside of a transaction. + +### Primary Datasource Properties + +Use the format `{primaryName}.ojp.readwrite.*` to configure the primary: + +| Property | Default | Description | +|---|---|---| +| `{primary}.ojp.readwrite.role` | — | Must be `primary` to enable read/write splitting for this datasource | +| `{primary}.ojp.readwrite.enabled` | `false` | Enable (`true`) or disable read/write splitting for this primary | +| `{primary}.ojp.readwrite.replicaSelectionStrategy` | `ROUND_ROBIN` | Replica selection strategy: `ROUND_ROBIN` or `RANDOM`. `LEAST_CONNECTIONS` is accepted but currently falls back to `ROUND_ROBIN`; metrics-based selection is planned for a future phase. | +| `{primary}.ojp.readwrite.stickySessionSeconds` | `0` | Read-your-writes window in seconds. `0` = disabled (opt-in). After a write, reads continue going to the primary for this many seconds before reverting to replica routing | +| `{primary}.ojp.readwrite.replicaFailoverToPrimary` | `true` | Fall back to the primary when no healthy replica is available | + +### Replica Datasource Properties + +Use the format `{replicaName}.ojp.*` to configure each replica: + +| Property | Default | Description | +|---|---|---| +| `{replica}.ojp.readwrite.role` | — | Must be `replica` | +| `{replica}.ojp.readwrite.primary` | — | Name of the primary datasource this replica belongs to | +| `{replica}.ojp.connection.url` | — | **Required.** JDBC URL of the replica database | +| `{replica}.ojp.connection.user` | `""` | Replica database user | +| `{replica}.ojp.connection.password` | `""` | Replica database password | +| `{replica}.ojp.pool.maxPoolSize` | `10` | Maximum connections in the replica pool | +| `{replica}.ojp.pool.minIdle` | `2` | Minimum idle connections | +| `{replica}.ojp.pool.connectionTimeout` | `30000` | Connection acquire timeout (ms) | +| `{replica}.ojp.pool.idleTimeout` | `600000` | Idle connection timeout (ms) | +| `{replica}.ojp.pool.maxLifetime` | `1800000` | Maximum connection lifetime (ms) | + +### Example Configuration + +```properties +# Primary datasource (read/write splitting enabled, no sticky session) +mydb.ojp.readwrite.role=primary +mydb.ojp.readwrite.enabled=true +mydb.ojp.readwrite.replicaSelectionStrategy=ROUND_ROBIN +# stickySessionSeconds defaults to 0 (disabled) — opt-in only when needed + +# Replica 1 +replica1.ojp.readwrite.role=replica +replica1.ojp.readwrite.primary=mydb +replica1.ojp.connection.url=jdbc:postgresql://replica1-host:5432/mydb +replica1.ojp.connection.user=app_ro +replica1.ojp.connection.password=secret +replica1.ojp.pool.maxPoolSize=15 +replica1.ojp.pool.minIdle=3 + +# Replica 2 +replica2.ojp.readwrite.role=replica +replica2.ojp.readwrite.primary=mydb +replica2.ojp.connection.url=jdbc:postgresql://replica2-host:5432/mydb +replica2.ojp.connection.user=app_ro +replica2.ojp.connection.password=secret +replica2.ojp.pool.maxPoolSize=15 +replica2.ojp.pool.minIdle=3 +``` + +### Sticky Sessions (Read-Your-Writes) + +Sticky sessions guarantee that a client which just executed a write will continue reading from the primary for a configurable window, giving replicas time to catch up. This is an opt-in behaviour because it introduces latency on the read path and is only necessary when your application requires seeing its own writes immediately outside of a transaction. + +```properties +# Enable 3-second sticky window (reads stay on primary for 3 s after every write) +mydb.ojp.readwrite.stickySessionSeconds=3 +``` + +**When to use sticky sessions:** +- Application executes a write and then immediately queries without a transaction, and must see the write +- Replication lag is measurable and the application is not latency-sensitive + +**When to leave sticky sessions disabled (`0`, the default):** +- All writes and their subsequent reads are wrapped in the same transaction (the transaction itself guarantees read-your-writes via the primary) +- The application tolerates eventual consistency for reads outside transactions + +### Routing Rules Summary + +| Operation | Inside Transaction | Sticky Window Active | Routes To | +|---|---|---|---| +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | — | — | Replica | +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | ✓ | — | Primary | +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | — | ✓ | Primary | +| INSERT / UPDATE / DELETE / DDL | — | — | Primary | +| INSERT / UPDATE / DELETE / DDL | ✓ | — | Primary | + ## Related Documentation - **[SSL/TLS Certificate Configuration Guide](ssl-tls-certificate-placeholders.md)** - Complete guide for configuring SSL/TLS certificates with property placeholders diff --git a/documents/configuration/ojp-server-configuration.md b/documents/configuration/ojp-server-configuration.md index 48cd1dd1d..87379da59 100644 --- a/documents/configuration/ojp-server-configuration.md +++ b/documents/configuration/ojp-server-configuration.md @@ -283,7 +283,18 @@ INFO SchemaCache - Schema cache updated with 42 tables For JDBC driver and client-side connection pool configuration, see: -- **[OJP JDBC Configuration](ojp-jdbc-configuration.md)** - JDBC driver setup and client connection pool settings +- **[OJP JDBC Configuration](ojp-jdbc-configuration.md)** — JDBC driver setup, client connection pool settings, and read/write splitting configuration + +### Read/Write Splitting + +OJP supports automatic read/write traffic splitting configured entirely through `ojp.properties` on the client side. No server-side settings are required. The server reads the `*.ojp.readwrite.*` properties forwarded by the driver and creates isolated replica connection pools automatically. + +Key points: +- Stateless auto-commit SELECTs are routed to a replica; all other operations go to the primary +- Operations inside an explicit transaction always use the primary +- **Sticky sessions are opt-in** — `stickySessionSeconds` defaults to `0` (disabled). Enable only when the application must read its own writes outside a transaction. A non-zero value keeps reads on the primary for that many seconds after every write. + +See **[OJP JDBC Configuration — Read/Write Splitting](ojp-jdbc-configuration.md#readwrite-splitting-configuration)** for the full property reference and examples. ## Configuration Methods @@ -557,8 +568,10 @@ INFO org.openjproxy.grpc.server.ServerConfiguration - Slow Query Slot Percenta - Increase timeouts in environments with occasional very slow queries 4. **Connection Pools**: Configure client-side pool sizes based on application requirements 5. **Request Size**: Increase for applications that handle large result sets +6. **Read/Write Splitting**: Size replica pools (`{replica}.ojp.pool.maxPoolSize`) to handle peak read traffic; leave `stickySessionSeconds` at `0` unless read-your-writes outside transactions is required ## Related Documentation - **[Slow Query Segregation Documentation](../designs/SLOW_QUERY_SEGREGATION.md)** - Detailed guide to the slow query segregation feature +- **[OJP JDBC Configuration](ojp-jdbc-configuration.md)** - Client-side pool settings and read/write splitting configuration - **[Example Configuration Properties](ojp-server-example.properties)** - Complete example configuration file with all settings \ No newline at end of file diff --git a/documents/ebook/appendix-c-glossary.md b/documents/ebook/appendix-c-glossary.md index 509ad58c6..653667dfb 100644 --- a/documents/ebook/appendix-c-glossary.md +++ b/documents/ebook/appendix-c-glossary.md @@ -232,7 +232,7 @@ The separation of slow queries into a dedicated connection pool to prevent them The mechanism by which clients find available service instances. In OJP multinode deployments, discovery is configuration-based (JDBC URL lists all servers). **Session Affinity** -Also called "sticky sessions," the routing of related requests to the same backend server. OJP implements session affinity for transactions and temporary tables. +Also called "sticky sessions," the routing of related requests to the same backend server. OJP implements session affinity for transactions and temporary tables. In the context of read/write splitting, sticky sessions ensure reads go to the primary for a configurable window after a write (see `stickySessionSeconds`). Sticky sessions are **opt-in** (default: disabled) — only enable when the application must see its own writes immediately outside of an explicit transaction. **Slow Query** A database query that takes significantly longer than average to execute. OJP automatically detects slow queries and can segregate them to prevent resource contention. diff --git a/documents/ebook/part2-chapter6-server-configuration.md b/documents/ebook/part2-chapter6-server-configuration.md index d4d8be48d..49f331117 100644 --- a/documents/ebook/part2-chapter6-server-configuration.md +++ b/documents/ebook/part2-chapter6-server-configuration.md @@ -430,12 +430,83 @@ export ojp.server.port=9059 The server logs its active configuration at INFO level during startup. Review this output to confirm your settings were applied correctly. If you see unexpected defaults, it means your configuration wasn't recognized—check for typos, case sensitivity, and format issues. +## 6.8 Read/Write Splitting + +OJP can automatically route read and write traffic to separate database instances. Write operations (INSERT, UPDATE, DELETE, DDL) always go to the primary. Stateless auto-commit reads (SELECT, WITH, EXPLAIN, SHOW, DESCRIBE) are routed to a replica chosen according to a configurable selection strategy. All operations inside an explicit transaction stay on the primary. + +Read/write splitting is configured entirely through the client's `ojp.properties` file — no server-side configuration changes are required. The OJP server reads the `*.ojp.readwrite.*` properties forwarded by the driver on first connection and creates isolated replica connection pools automatically. + +### 6.8.1 Enabling Read/Write Splitting + +Mark the primary datasource and each replica in `ojp.properties`: + +```properties +# Primary datasource +mydb.ojp.readwrite.role=primary +mydb.ojp.readwrite.enabled=true +mydb.ojp.readwrite.replicaSelectionStrategy=ROUND_ROBIN + +# Replica 1 +replica1.ojp.readwrite.role=replica +replica1.ojp.readwrite.primary=mydb +replica1.ojp.connection.url=jdbc:postgresql://replica1:5432/mydb +replica1.ojp.connection.user=app_ro +replica1.ojp.connection.password=secret + +# Replica 2 +replica2.ojp.readwrite.role=replica +replica2.ojp.readwrite.primary=mydb +replica2.ojp.connection.url=jdbc:postgresql://replica2:5432/mydb +replica2.ojp.connection.user=app_ro +replica2.ojp.connection.password=secret +``` + +Three replica selection strategies are available: + +- **`ROUND_ROBIN`** (default) — distributes reads evenly across all replicas in order +- **`RANDOM`** — picks a replica at random for each request +- **`LEAST_CONNECTIONS`** — selects the replica with fewest active connections (Phase 3) + +### 6.8.2 Sticky Sessions (Read-Your-Writes) + +Sticky sessions keep reads on the primary for a short window after every write, giving replicas time to catch up. This is **opt-in**: the default `stickySessionSeconds` is `0` (disabled). + +```properties +# Keep reads on primary for 3 seconds after every write +mydb.ojp.readwrite.stickySessionSeconds=3 +``` + +> **Important:** Only enable sticky sessions when the application must see its own writes immediately outside of a transaction. If all reads following a write are in the same transaction, the transaction itself already guarantees read-your-writes via the primary, and sticky sessions add unnecessary overhead. + +### 6.8.3 Routing Rules + +| Operation | Inside Transaction | Sticky Window Active | Routes To | +|---|---|---|---| +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | — | — | Replica | +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | ✓ | — | Primary | +| SELECT / WITH / EXPLAIN / SHOW / DESCRIBE | — | ✓ | Primary | +| INSERT / UPDATE / DELETE / DDL | any | any | Primary | + +### 6.8.4 Replica Pool Configuration + +Each replica has its own connection pool. Size it to handle peak read traffic independently of the primary pool. + +| Property | Default | Description | +|---|---|---| +| `{replica}.ojp.pool.maxPoolSize` | `10` | Maximum replica pool size | +| `{replica}.ojp.pool.minIdle` | `2` | Minimum idle connections | +| `{replica}.ojp.pool.connectionTimeout` | `30000` | Acquire timeout (ms) | +| `{replica}.ojp.pool.idleTimeout` | `600000` | Idle connection timeout (ms) | +| `{replica}.ojp.pool.maxLifetime` | `1800000` | Maximum connection lifetime (ms) | + +For the complete property reference see [OJP JDBC Configuration — Read/Write Splitting](../../documents/configuration/ojp-jdbc-configuration.md#readwrite-splitting-configuration). + ## Summary OJP server configuration gives you precise control over server behavior, security, performance, and observability. The hierarchical configuration system with JVM properties and environment variables provides flexibility for different deployment scenarios. Default settings work well for most use cases, but understanding the available options lets you optimize for your specific workload. -Key configuration areas include core server settings for network and threading, security controls through IP whitelisting, logging levels for operational visibility, OpenTelemetry integration for observability, circuit breakers for resilience, and slow query segregation for performance under mixed workloads. Each area offers sensible defaults that you can refine based on monitoring data. +Key configuration areas include core server settings for network and threading, security controls through IP whitelisting, logging levels for operational visibility, OpenTelemetry integration for observability, circuit breakers for resilience, slow query segregation for performance under mixed workloads, and read/write splitting for scaling read traffic across replicas. Each area offers sensible defaults that you can refine based on monitoring data. Start simple, monitor closely, and adjust based on observed behavior. Good configuration emerges from understanding your workload and using OJP's flexibility to match it, not from cargo-culting settings from other environments. -**[IMAGE PROMPT: Create a summary mind map with "OJP Server Configuration" at the center. Six main branches radiating outward: "Core Settings" (server icon), "Security" (lock icon), "Logging" (document icon), "Telemetry" (graph icon), "Circuit Breaker" (shield icon), and "Slow Query Segregation" (speedometer icon). Each branch has 2-3 sub-branches with key points. Use colors to group related concepts and make it visually hierarchical. Style: Modern mind map with icons and color coding.]** +**[IMAGE PROMPT: Create a summary mind map with "OJP Server Configuration" at the center. Seven main branches radiating outward: "Core Settings" (server icon), "Security" (lock icon), "Logging" (document icon), "Telemetry" (graph icon), "Circuit Breaker" (shield icon), "Slow Query Segregation" (speedometer icon), and "Read/Write Splitting" (fork/branch icon). Each branch has 2-3 sub-branches with key points. Use colors to group related concepts and make it visually hierarchical. Style: Modern mind map with icons and color coding.]** diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/DatasourcePropertiesLoader.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/DatasourcePropertiesLoader.java index d886a8e67..6956801fb 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/DatasourcePropertiesLoader.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/DatasourcePropertiesLoader.java @@ -17,6 +17,12 @@ * For example, properties prefixed with {@code mainApp.ojp.connection.pool.*} belong to * the datasource named {@code mainApp}. Unprefixed {@code ojp.connection.pool.*} properties * belong to the implicit {@code "default"} datasource. + * + *

All {@code *.ojp.*} properties (e.g. read/write splitting, replica connection URLs) are + * forwarded to the server with their full keys intact so that server-side parsers such as + * {@code ReadWriteConfigurationParser} can find them. Pool and XA properties for the primary + * datasource are additionally forwarded with the datasource prefix stripped for backward + * compatibility with existing server-side pool configuration readers. */ @Slf4j public class DatasourcePropertiesLoader { @@ -65,9 +71,16 @@ private static void applyFileProperties(Properties result, Properties source, String prefixDot, boolean isDefault) { boolean found = false; for (String key : source.stringPropertyNames()) { - if (hasPrefixedOjpKey(key, prefixDot)) { - result.setProperty(key.substring(prefixDot.length()), source.getProperty(key)); + String value = source.getProperty(key); + if (hasPrefixedPoolOrXaKey(key, prefixDot)) { + // Pool and XA properties for this datasource: strip prefix for backward compat + // Example: "myapp.ojp.connection.pool.maxPoolSize=10" → "ojp.connection.pool.maxPoolSize=10" + result.setProperty(key.substring(prefixDot.length()), value); found = true; + } else if (isPrefixedOjpKey(key)) { + // Keep full key for read/write splitting and replica configs so the server can find them + // Example: "replica1.ojp.readwrite.primary=myapp" stays as-is + result.setProperty(key, value); } } if (!found && isDefault) { @@ -76,38 +89,55 @@ private static void applyFileProperties(Properties result, Properties source, } private static void applySystemProperties(Properties result, String prefixDot, boolean isDefault) { - for (String key : System.getProperties().stringPropertyNames()) { - String value = System.getProperty(key); - if (hasPrefixedOjpKey(key, prefixDot)) { - String std = key.substring(prefixDot.length()); - result.setProperty(std, value); - log.debug("Overriding property from system property: {} = {}", std, value); - } else if (isDefault && isUnprefixedOjpKey(key)) { - result.setProperty(key, value); - log.debug("Overriding property from system property: {} = {}", key, value); - } - } + applyNormalizedProperties(result, System.getProperties(), prefixDot, isDefault, "system property"); } private static void applyEnvProperties(Properties result, String prefixDot, boolean isDefault) { + Properties normalized = new Properties(); for (Map.Entry entry : System.getenv().entrySet()) { - String key = entry.getKey().toLowerCase().replace('_', '.'); - String value = entry.getValue(); - if (hasPrefixedOjpKey(key, prefixDot)) { + normalized.setProperty(entry.getKey().toLowerCase().replace('_', '.'), entry.getValue()); + } + applyNormalizedProperties(result, normalized, prefixDot, isDefault, "environment variable"); + } + + /** + * Applies properties from a source (system properties or environment variables) to the result. + */ + private static void applyNormalizedProperties(Properties result, Properties source, + String prefixDot, boolean isDefault, String sourceName) { + for (String key : source.stringPropertyNames()) { + String value = source.getProperty(key); + if (hasPrefixedPoolOrXaKey(key, prefixDot)) { String std = key.substring(prefixDot.length()); result.setProperty(std, value); - log.debug("Overriding property from environment variable: {} = {}", std, value); + log.debug("Overriding property from {}: {} = {}", sourceName, std, value); + } else if (isPrefixedOjpKey(key)) { + result.setProperty(key, value); + log.debug("Setting property from {} (full key): {} = {}", sourceName, key, value); } else if (isDefault && isUnprefixedOjpKey(key)) { result.setProperty(key, value); - log.debug("Overriding property from environment variable: {} = {}", key, value); + log.debug("Overriding property from {}: {} = {}", sourceName, key, value); } } } - private static boolean hasPrefixedOjpKey(String key, String prefixDot) { + /** + * Checks if a property key is a pool or XA property for a specific datasource. + * These properties get their prefix stripped for backward compatibility. + */ + private static boolean hasPrefixedPoolOrXaKey(String key, String prefixDot) { return key.startsWith(prefixDot + OJP_POOL_PREFIX) || key.startsWith(prefixDot + OJP_XA_PREFIX); } + /** + * Returns true for any property that contains {@code .ojp.} in its key, i.e. any prefixed + * OJP property regardless of the datasource name prefix. These are forwarded with their + * full key so that server-side parsers (e.g. read/write splitting) can find them. + */ + private static boolean isPrefixedOjpKey(String key) { + return key.contains(".ojp."); + } + private static boolean isUnprefixedOjpKey(String key) { return key.startsWith(OJP_POOL_PREFIX) || key.startsWith(OJP_XA_PREFIX); } diff --git a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java index b282aeadf..f3fc54702 100644 --- a/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java +++ b/ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/Driver.java @@ -87,13 +87,26 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept connBuilder.addAllServerEndpoints(serverEndpoints); log.info("Adding {} server endpoint(s) to ConnectionDetails", serverEndpoints.size()); + // Build combined properties map: file (ojp.properties) takes lowest priority; + // inline ojp.* properties from info override the file so callers can supply + // read/write splitting and other configuration directly via + // DriverManager.getConnection(url, info) without a server-side properties file. + Map propertiesMap = new HashMap<>(); if (ojpProperties != null && !ojpProperties.isEmpty()) { - // Convert Properties to Map - Map propertiesMap = new HashMap<>(); for (String key : ojpProperties.stringPropertyNames()) { propertiesMap.put(key, ojpProperties.getProperty(key)); } - + } + if (info != null) { + for (String key : info.stringPropertyNames()) { + // Forward any *.ojp.* properties (e.g. read/write splitting, replica config) + // and the top-level ojp.* properties (e.g. ojp.datasource.name). + if (key.contains(".ojp.") || key.startsWith("ojp.")) { + propertiesMap.put(key, info.getProperty(key)); + } + } + } + if (!propertiesMap.isEmpty()) { // Add cache configuration properties to the map try { CacheConfigurationBuilder.addCachePropertiesToMap(propertiesMap, dataSourceName); @@ -103,7 +116,7 @@ public java.sql.Connection connect(String url, Properties info) throws SQLExcept } connBuilder.addAllProperties(ProtoConverter.propertiesToProto(propertiesMap)); - log.debug("Loaded ojp.properties with {} properties for dataSource: {}", propertiesMap.size(), dataSourceName); + log.debug("Loaded {} properties for dataSource: {}", propertiesMap.size(), dataSourceName); } log.info("Calling connect() on statement service with URL: {}", connectionUrl); diff --git a/ojp-jdbc-driver/src/test/java/openjproxy/jdbc/H2ReadWriteSplittingEndToEndTest.java b/ojp-jdbc-driver/src/test/java/openjproxy/jdbc/H2ReadWriteSplittingEndToEndTest.java new file mode 100644 index 000000000..39addf889 --- /dev/null +++ b/ojp-jdbc-driver/src/test/java/openjproxy/jdbc/H2ReadWriteSplittingEndToEndTest.java @@ -0,0 +1,546 @@ +package openjproxy.jdbc; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.SQLException; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * End-to-end integration tests for read/write traffic splitting through OJP JDBC driver and server. + * + *

Client-Side Configuration

+ * + *

+ * All read/write splitting configuration is supplied by the client via the {@link java.util.Properties} + * object passed to {@link java.sql.DriverManager#getConnection(String, Properties)}. No server-side + * properties file is required. The driver forwards these properties to the OJP server, which uses + * them to configure the primary/replica datasources on first connection. + *

+ * + *

Example configuration properties used by these tests:

+ *
+ * Properties props = new Properties();
+ * props.setProperty("user", "sa");
+ * props.setProperty("password", "");
+ * props.setProperty("ojp.datasource.name",                         "rw_e2e_ds");
+ * props.setProperty("rw_e2e_ds.ojp.readwrite.enabled",             "true");
+ * props.setProperty("rw_e2e_replica.ojp.readwrite.role",           "replica");
+ * props.setProperty("rw_e2e_replica.ojp.readwrite.primary",        "rw_e2e_ds");
+ * props.setProperty("rw_e2e_replica.ojp.connection.url",
+ *         "jdbc:h2:mem:rw_e2e_replica;DB_CLOSE_DELAY=-1");
+ * props.setProperty("rw_e2e_replica.ojp.connection.user",  "sa");
+ * props.setProperty("rw_e2e_replica.ojp.connection.password", "");
+ * 
+ * + *

Test Strategy: Dual Unsynchronized H2 Databases

+ * + *

+ * Two separate, intentionally unsynchronized H2 in-memory databases are used: + *

+ * + *

+ * Routing correctness is verified by checking which row is returned: + * id=2 / source="replica" means the query was routed to the replica; + * id=1 / source="primary" means it was routed to the primary. + *

+ * + *

Test Execution Requirements

+ * + * + * @see org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry + * @see org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceManager + * @see org.openjproxy.grpc.server.readwrite.ReadWriteConfigurationParser + */ +class H2ReadWriteSplittingEndToEndTest { + + private static final String OJP_HOST = "localhost:1059"; + private static final String USER = "sa"; + private static final String PASSWORD = ""; + private static final String PRIMARY_DATASOURCE_NAME = "rw_e2e_ds"; + private static final String REPLICA_DATASOURCE_NAME = "rw_e2e_replica"; + + private static boolean isH2TestEnabled; + + private Connection connection; + + @BeforeAll + static void setupClass() { + isH2TestEnabled = Boolean.parseBoolean(System.getProperty("enableH2Tests", "false")); + } + + @AfterEach + void tearDown() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + // ignore + } + } + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + /** + * Returns JDBC URL that routes through OJP to the primary H2 database. + */ + private String primaryUrl() { + return "jdbc:ojp[" + OJP_HOST + "]_h2:mem:rw_e2e_primary;DB_CLOSE_DELAY=-1"; + } + + /** + * Returns JDBC URL that routes through OJP to the replica H2 database. + */ + private String replicaUrl() { + return "jdbc:ojp[" + OJP_HOST + "]_h2:mem:rw_e2e_replica;DB_CLOSE_DELAY=-1"; + } + + /** + * Builds the Properties for a primary-datasource connection, including the full + * read/write splitting configuration so the OJP server can configure routing on + * first use — no server-side properties file needed. + */ + private Properties primaryProps() { + Properties props = new Properties(); + props.setProperty("user", USER); + props.setProperty("password", PASSWORD); + props.setProperty("ojp.datasource.name", PRIMARY_DATASOURCE_NAME); + // Read/write splitting config forwarded to the OJP server + props.setProperty(PRIMARY_DATASOURCE_NAME + ".ojp.readwrite.enabled", "true"); + props.setProperty(REPLICA_DATASOURCE_NAME + ".ojp.readwrite.role", "replica"); + props.setProperty(REPLICA_DATASOURCE_NAME + ".ojp.readwrite.primary", PRIMARY_DATASOURCE_NAME); + props.setProperty(REPLICA_DATASOURCE_NAME + ".ojp.connection.url", + "jdbc:h2:mem:rw_e2e_replica;DB_CLOSE_DELAY=-1"); + // Replica credentials: server uses these to open the replica pool + props.setProperty(REPLICA_DATASOURCE_NAME + ".ojp.connection.user", USER); + props.setProperty(REPLICA_DATASOURCE_NAME + ".ojp.connection.password", PASSWORD); + return props; + } + + /** + * Builds the Properties for a direct replica-datasource connection (used to seed the + * replica H2 database during test setup). + */ + private Properties replicaProps() { + Properties props = new Properties(); + props.setProperty("user", USER); + props.setProperty("password", PASSWORD); + props.setProperty("ojp.datasource.name", REPLICA_DATASOURCE_NAME); + return props; + } + + /** + * Seeds both H2 databases through OJP so that each test starts from a known state: + * + */ + private void setupDatabases() throws SQLException { + // Seed primary + try (Connection c = DriverManager.getConnection(primaryUrl(), primaryProps()); + Statement s = c.createStatement()) { + s.execute("DROP TABLE IF EXISTS test_data"); + s.execute("CREATE TABLE test_data (id INT PRIMARY KEY, source VARCHAR(50))"); + s.execute("INSERT INTO test_data VALUES (1, 'primary')"); + } + + // Seed replica + try (Connection c = DriverManager.getConnection(replicaUrl(), replicaProps()); + Statement s = c.createStatement()) { + s.execute("DROP TABLE IF EXISTS test_data"); + s.execute("CREATE TABLE test_data (id INT PRIMARY KEY, source VARCHAR(50))"); + s.execute("INSERT INTO test_data VALUES (2, 'replica')"); + } + } + + // ----------------------------------------------------------------------- + // Tests + // ----------------------------------------------------------------------- + + /** + * SELECT outside a transaction must be routed to the replica (id=2). + */ + @Test + void testSelectGoesToReplica_WithoutTransaction() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT id, source FROM test_data")) { + + assertTrue(rs.next(), "Should have at least one row"); + assertEquals(2, rs.getInt("id"), + "SELECT outside transaction should route to replica (id=2)"); + assertEquals("replica", rs.getString("source"), + "SELECT outside transaction should route to replica"); + } + } + + /** + * Multiple sequential SELECTs outside a transaction must all go to the replica. + */ + @Test + void testMultipleReads_AllGoToReplica() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + for (int i = 0; i < 3; i++) { + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT id, source FROM test_data")) { + + assertTrue(rs.next(), "Should have at least one row in iteration " + i); + assertEquals(2, rs.getInt("id"), + "SELECT #" + i + " should route to replica (id=2)"); + assertEquals("replica", rs.getString("source"), + "SELECT #" + i + " should route to replica"); + } + } + } + + /** + * INSERT must be routed to the primary and be visible there. + */ + @Test + void testInsertGoesToPrimary() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + try (Statement stmt = connection.createStatement()) { + int affected = stmt.executeUpdate("INSERT INTO test_data VALUES (100, 'inserted')"); + assertEquals(1, affected, "INSERT should affect 1 row"); + } + + // Verify via a fresh primary connection (write went to primary) + try (Connection verify = DriverManager.getConnection(primaryUrl(), primaryProps())) { + // Use a transaction to force routing to primary + verify.setAutoCommit(false); + try (Statement s = verify.createStatement(); + ResultSet rs = s.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 100")) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1), "Primary database should contain the inserted row"); + } + verify.rollback(); + } + } + + /** + * UPDATE must be routed to the primary and the change must be visible there. + */ + @Test + void testUpdateGoesToPrimary() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + try (Statement stmt = connection.createStatement()) { + int affected = stmt.executeUpdate("UPDATE test_data SET source = 'updated' WHERE id = 1"); + assertEquals(1, affected, "UPDATE should affect 1 row"); + } + + try (Connection verify = DriverManager.getConnection(primaryUrl(), primaryProps())) { + // Force select to go to primary for validation + verify.setAutoCommit(false); + try (Statement s = verify.createStatement(); + ResultSet rs = s.executeQuery("SELECT source FROM test_data WHERE id = 1")) { + assertTrue(rs.next()); + assertEquals("updated", rs.getString("source"), + "Primary database should show the updated value"); + } + verify.rollback(); + } + } + + /** + * DELETE must be routed to the primary; the row must be absent from the primary afterwards. + */ + @Test + void testDeleteGoesToPrimary() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + try (Statement stmt = connection.createStatement()) { + int affected = stmt.executeUpdate("DELETE FROM test_data WHERE id = 1"); + assertEquals(1, affected, "DELETE should affect 1 row"); + } + + try (Connection verify = DriverManager.getConnection(primaryUrl(), primaryProps())) { + // Force routing to primary (transactions always route to primary) so we verify + // the primary was actually modified and not the replica. + verify.setAutoCommit(false); + try (Statement s = verify.createStatement(); + ResultSet rs = s.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 1")) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1), + "Primary database should not contain the deleted row"); + } + verify.rollback(); + } + } + + /** + * A write followed immediately by a read (no sticky session) demonstrates eventual + * consistency: the read goes to the replica and does not see the write. + */ + @Test + void testWriteThenRead_WithoutStickySession_DoesNotSeeWrite() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate("INSERT INTO test_data VALUES (150, 'eventual_consistency_test')"); + + try (ResultSet rs = stmt.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 150")) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1), + "Replica should not yet have the row just inserted into the primary"); + } + } + } + + /** + * With sticky session configured, a write followed immediately by a read SHOULD see + * the write (read-your-writes guarantee). After the sticky session expires, reads + * go back to the replica. + */ + @Test + void testWriteThenRead_WithStickySession_SeesWrite() throws SQLException, InterruptedException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + String stickyPrimaryUrl = "jdbc:ojp[" + OJP_HOST + "]_h2:mem:rw_sticky_primary;DB_CLOSE_DELAY=-1"; + String stickyReplicaUrl = "jdbc:ojp[" + OJP_HOST + "]_h2:mem:rw_sticky_replica;DB_CLOSE_DELAY=-1"; + + Properties stickyProps = stickyProps(); + + // Setup separate sticky session databases + try (Connection c = DriverManager.getConnection(stickyPrimaryUrl, stickyProps); + Statement s = c.createStatement()) { + s.execute("DROP TABLE IF EXISTS test_data"); + s.execute("CREATE TABLE test_data (id INT PRIMARY KEY, source VARCHAR(50))"); + s.execute("INSERT INTO test_data VALUES (1, 'sticky_primary')"); + } + + Properties replicaOnlyProps = new Properties(); + replicaOnlyProps.setProperty("user", USER); + replicaOnlyProps.setProperty("password", PASSWORD); + replicaOnlyProps.setProperty("ojp.datasource.name", "rw_sticky_replica"); + + try (Connection c = DriverManager.getConnection(stickyReplicaUrl, replicaOnlyProps); + Statement s = c.createStatement()) { + s.execute("DROP TABLE IF EXISTS test_data"); + s.execute("CREATE TABLE test_data (id INT PRIMARY KEY, source VARCHAR(50))"); + s.execute("INSERT INTO test_data VALUES (2, 'sticky_replica')"); + } + + connection = DriverManager.getConnection(stickyPrimaryUrl, stickyProps); + + try (Statement stmt = connection.createStatement()) { + // Write to primary + stmt.executeUpdate("INSERT INTO test_data VALUES (160, 'sticky_write')"); + + // Immediate read: sticky session should route to primary → sees the write + try (ResultSet rs = stmt.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 160")) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1), + "With sticky session, read immediately after write should see the write (primary)"); + } + + // Wait for sticky session to expire (3 seconds + buffer) + Thread.sleep(3500); //NOSONAR - intentional wait for sticky session expiration + + // After expiration, read goes to replica → does not see the write + try (ResultSet rs = stmt.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 160")) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1), + "After sticky session expires, read should go to replica and not see the write"); + } + } + } + + /** + * Builds Properties with sticky session (3 second timeout) for read/write splitting. + */ + private Properties stickyProps() { + Properties props = new Properties(); + props.setProperty("user", USER); + props.setProperty("password", PASSWORD); + props.setProperty("ojp.datasource.name", "rw_sticky_ds"); + props.setProperty("rw_sticky_ds.ojp.readwrite.enabled", "true"); + props.setProperty("rw_sticky_ds.ojp.readwrite.stickySessionSeconds", "3"); + props.setProperty("rw_sticky_replica.ojp.readwrite.role", "replica"); + props.setProperty("rw_sticky_replica.ojp.readwrite.primary", "rw_sticky_ds"); + props.setProperty("rw_sticky_replica.ojp.connection.url", + "jdbc:h2:mem:rw_sticky_replica;DB_CLOSE_DELAY=-1"); + props.setProperty("rw_sticky_replica.ojp.connection.user", USER); + props.setProperty("rw_sticky_replica.ojp.connection.password", PASSWORD); + return props; + } + + /** + * All operations inside an explicit transaction must route to the primary. + */ + @Test + void testTransaction_AllOperationsGoToPrimary() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + connection.setAutoCommit(false); + + try (Statement stmt = connection.createStatement()) { + // SELECT inside transaction → primary (id=1) + try (ResultSet rs = stmt.executeQuery("SELECT id, source FROM test_data")) { + assertTrue(rs.next(), "Should have at least one row"); + assertEquals(1, rs.getInt("id"), + "SELECT inside transaction should route to primary (id=1)"); + assertEquals("primary", rs.getString("source"), + "SELECT inside transaction should route to primary"); + } + + stmt.executeUpdate("INSERT INTO test_data VALUES (200, 'tx_inserted')"); + + try (ResultSet rs = stmt.executeQuery( + "SELECT source FROM test_data WHERE id = 200")) { + assertTrue(rs.next(), "Should see the row inserted in the same transaction"); + assertEquals("tx_inserted", rs.getString("source")); + } + + connection.commit(); + } finally { + connection.setAutoCommit(true); + } + } + + /** + * Without a sticky session, a read immediately after a committed transaction is expected to go + * to the replica (eventual consistency). The replica does not have the just-committed row. + * + *

DISABLED — current limitation: {@code setAutoCommit(true)} is not propagated to the server. + * + *

When the client calls {@code connection.setAutoCommit(true)} after committing, the OJP + * driver currently does NOT forward this to the server via {@code callResource} or any other + * gRPC call. As a result, the server-side physical JDBC connection remains in + * {@code autoCommit=false} mode even after the transaction has been committed. Because + * {@link org.openjproxy.grpc.server.Session#hasActiveTransaction()} checks + * {@code !primaryConnection.getAutoCommit()}, it continues to return {@code true}, and + * subsequent SELECT statements are pinned to the primary instead of being routed to the + * replica. + * + *

Note on propagating {@code setAutoCommit(true)} via {@code callResource}: + * Propagating this call is not straightforward and requires careful evaluation. Key concerns + * include: (1) the {@link com.openjproxy.grpc.TransactionInfo} embedded in + * {@link com.openjproxy.grpc.SessionInfo} would become stale after a {@code callResource} + * invocation (the response does not update {@code TransactionInfo}); (2) implicit-commit + * semantics on {@code setAutoCommit(true)} vary across database drivers and must be + * validated per supported database; and (3) interaction with the server-side connection pool + * cleanup logic needs to be verified. See the analysis document for full details. + * + * @see #testAfterTransactionCommit_ReadsGoToPrimary_WithNoStickySession + */ + @Disabled("setAutoCommit(true) is not propagated to the server — reads stay pinned to primary " + + "instead of routing to replica; propagating this requires careful evaluation, " + + "see Javadoc for details") + @SneakyThrows + @Test + void testAfterTransactionCommit_ReadsGoToReplica_WithNoStickySession() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + connection.setAutoCommit(false); + + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate("INSERT INTO test_data VALUES (250, 'post_tx_test')"); + connection.commit(); + } + + connection.setAutoCommit(true); + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 250")) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1), + "Without sticky session, SELECT after commit routes to replica which does not have the row"); + } + } + + /** + * Documents the current actual behavior: after an explicit transaction is committed, reads + * continue to go to the primary because the connection remains in + * {@code autoCommit=false} mode and the read/write splitter sees an active transaction. + * + *

{@link org.openjproxy.grpc.server.Session#hasActiveTransaction()} checks + * {@code !primaryConnection.getAutoCommit()}, which still returns {@code true} after the + * commit, causing all subsequent SELECT statements to be routed to the primary. + * The inserted row (id=251) is present on the primary, so the count is 1. + */ + @SneakyThrows + @Test + void testAfterTransactionCommit_ReadsGoToPrimary_WithNoStickySession() throws SQLException { + Assumptions.assumeTrue(isH2TestEnabled, "Skipping H2 tests - not enabled"); + + setupDatabases(); + + connection = DriverManager.getConnection(primaryUrl(), primaryProps()); + connection.setAutoCommit(false); + + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate("INSERT INTO test_data VALUES (251, 'post_tx_primary_test')"); + connection.commit(); + } + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery( + "SELECT COUNT(*) FROM test_data WHERE id = 251")) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1), + "Current behavior: connection is in autoCommit=false mode, " + + "so hasActiveTransaction() returns true and SELECT routes to primary " + + "which has the newly inserted row"); + } + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/Session.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/Session.java index 13094dafa..655a8bf1e 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/Session.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/Session.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.openjproxy.grpc.server.cache.CacheConfiguration; +import javax.sql.DataSource; import javax.sql.XAConnection; import javax.transaction.xa.XAResource; import java.sql.CallableStatement; @@ -20,6 +21,17 @@ /** * Holds information about a session of a given client. + *

+ * Supports two construction modes: + *

*/ @Slf4j public class Session { @@ -29,7 +41,14 @@ public class Session { private final String connectionHash; @Getter private final String clientUUID; - private Connection connection; // Removed @Getter - custom getter below + /** Primary connection — may be null until lazily acquired from {@link #primaryDataSource}. */ + private volatile Connection primaryConnection; + /** Replica connection — null until lazily acquired via {@link #getOrCreateReplicaConnection()}. */ + private volatile Connection replicaConnection; + /** DataSource for lazy primary acquisition; null for eagerly-constructed (XA) sessions. */ + private final DataSource primaryDataSource; + /** DataSource for lazy replica acquisition; null when no replica is configured. */ + private final DataSource replicaDataSource; @Getter private final boolean isXA; @Getter @@ -53,6 +72,45 @@ public class Session { @Getter private final long creationTime; + /** + * Lazy dual-datasource constructor. No connections are acquired at + * construction time; they are obtained on demand when + * {@link #getConnection()} or {@link #getOrCreateReplicaConnection()} is + * first called. + * + * @param primaryDataSource datasource for the primary database (never null) + * @param replicaDataSource datasource for a read replica; {@code null} when + * no replica is configured + * @param connectionHash connection hash identifying this datasource pair + * @param clientUUID client identifier + * @param cacheConfiguration optional query-cache configuration (may be null) + */ + public Session(DataSource primaryDataSource, DataSource replicaDataSource, + String connectionHash, String clientUUID, + CacheConfiguration cacheConfiguration) { + this.primaryDataSource = primaryDataSource; + this.replicaDataSource = replicaDataSource; + this.primaryConnection = null; + this.replicaConnection = null; + this.connectionHash = connectionHash; + this.clientUUID = clientUUID; + this.isXA = false; + this.xaConnection = null; + this.cacheConfiguration = cacheConfiguration; + this.sessionUUID = UUID.randomUUID().toString(); + this.closed = false; + this.creationTime = System.nanoTime(); + this.lastActivityTime = this.creationTime; + this.resultSetMap = new ConcurrentHashMap<>(); + this.statementMap = new ConcurrentHashMap<>(); + this.preparedStatementMap = new ConcurrentHashMap<>(); + this.callableStatementMap = new ConcurrentHashMap<>(); + this.lobMap = new ConcurrentHashMap<>(); + this.attrMap = new ConcurrentHashMap<>(); + } + + // ---- Eager constructors (kept for XA and legacy non-XA callers) ---- + public Session(Connection connection, String connectionHash, String clientUUID) { this(connection, connectionHash, clientUUID, false, null, null); } @@ -62,7 +120,10 @@ public Session(Connection connection, String connectionHash, String clientUUID, } public Session(Connection connection, String connectionHash, String clientUUID, boolean isXA, XAConnection xaConnection, CacheConfiguration cacheConfiguration) { - this.connection = connection; + this.primaryConnection = connection; + this.replicaConnection = null; + this.primaryDataSource = null; + this.replicaDataSource = null; this.connectionHash = connectionHash; this.clientUUID = clientUUID; this.isXA = isXA; @@ -102,7 +163,7 @@ public synchronized void bindXAConnection(XAConnection xaConn, Object backendSes if (xaConn == null && backendSession == null) { this.xaConnection = null; this.backendSession = null; - this.connection = null; + this.primaryConnection = null; this.xaResource = null; log.debug("Unbound XAConnection from session {}", sessionUUID); return; @@ -118,7 +179,7 @@ public synchronized void bindXAConnection(XAConnection xaConn, Object backendSes try { this.xaConnection = xaConn; this.backendSession = backendSession; - this.connection = xaConn.getConnection(); + this.primaryConnection = xaConn.getConnection(); this.xaResource = xaConn.getXAResource(); log.debug("Bound XAConnection to session {}", sessionUUID); } catch (SQLException e) { @@ -147,19 +208,23 @@ public void refreshConnection() throws SQLException { if (backendSession != null && backendSession instanceof org.openjproxy.xa.pool.XABackendSession) { org.openjproxy.xa.pool.XABackendSession xaBackendSession = (org.openjproxy.xa.pool.XABackendSession) backendSession; - this.connection = xaBackendSession.getConnection(); + this.primaryConnection = xaBackendSession.getConnection(); log.debug("Refreshed connection reference in session {}", sessionUUID); } } /** - * Gets the JDBC connection for this session. - * For XA sessions with pooled backend sessions, this returns the current - * connection from the backend session (which may change after sanitization). + * Gets the primary JDBC connection for this session. + *

+ * For lazy sessions (created with {@link #Session(DataSource, DataSource, String, String, CacheConfiguration)}), + * this acquires a connection from the primary datasource on first call and caches it + * for subsequent calls. For XA sessions with a pooled backend, the fresh connection + * is returned from the backend session. * - * @return the JDBC connection + * @return the primary JDBC connection, or {@code null} if the session has no primary + * datasource and no eagerly-supplied connection */ - public Connection getConnection() { + public synchronized Connection getConnection() { // For XA sessions with backend session, always get fresh connection reference // This ensures we get the updated connection after sanitization if (isXA && backendSession != null && backendSession instanceof org.openjproxy.xa.pool.XABackendSession) { @@ -167,8 +232,90 @@ public Connection getConnection() { (org.openjproxy.xa.pool.XABackendSession) backendSession; return xaBackendSession.getConnection(); } - // For non-XA sessions or pass-through XA sessions, return stored connection - return this.connection; + // Lazy acquisition for dual-datasource sessions + if (primaryConnection == null && primaryDataSource != null) { + try { + primaryConnection = primaryDataSource.getConnection(); + log.debug("Lazily acquired primary connection for session {}", sessionUUID); + } catch (SQLException e) { + throw new RuntimeException("Failed to acquire primary connection for session " + sessionUUID, e); + } + } + return primaryConnection; + } + + /** + * Gets (or lazily creates) the replica JDBC connection for this session. + *

+ * The connection is acquired from the replica datasource supplied at construction + * time and cached for subsequent calls. Returns {@code null} when no replica + * datasource was configured. + * + * @return the replica JDBC connection, or {@code null} if no replica is configured + * @throws SQLException if acquiring the connection from the pool fails + */ + public synchronized Connection getOrCreateReplicaConnection() throws SQLException { + return getOrCreateReplicaConnection(null); + } + + /** + * Gets (or lazily creates) the replica JDBC connection for this session. + *

+ * Uses the replica datasource supplied at construction time when available; + * falls back to {@code fallbackReplicaDs} when the session was created without + * a replica datasource (e.g. originally created for a write / INSERT). The + * acquired connection is cached and reused on subsequent calls. + * + * @param fallbackReplicaDs datasource to use when no replica datasource was set + * at construction time; may be {@code null} + * @return the replica JDBC connection, or {@code null} if no replica datasource + * is available + * @throws SQLException if acquiring the connection from the pool fails + */ + public synchronized Connection getOrCreateReplicaConnection(DataSource fallbackReplicaDs) throws SQLException { + if (replicaConnection == null) { + DataSource ds = (replicaDataSource != null) ? replicaDataSource : fallbackReplicaDs; + if (ds != null) { + replicaConnection = ds.getConnection(); + log.debug("[RW-SPLIT] getOrCreateReplicaConnection: CREATED new replica connection from {} datasource for session={}", + (replicaDataSource != null) ? "instance" : "fallback", sessionUUID); + } else { + log.debug("[RW-SPLIT] getOrCreateReplicaConnection: no datasource available (replicaDataSource=null, fallback=null) for session={}", + sessionUUID); + } + } else { + log.debug("[RW-SPLIT] getOrCreateReplicaConnection: REUSING existing replica connection for session={}", sessionUUID); + } + return replicaConnection; + } + + /** + * Returns {@code true} when the primary connection exists and has an open + * (non-autoCommit) transaction. Does not trigger lazy primary + * connection acquisition; returns {@code false} when no primary connection + * has been acquired yet (i.e. the session is replica-only so far). + * + *

This method is {@code synchronized} to ensure it sees the latest value + * of {@code primaryConnection} (e.g. after a concurrent {@link #getConnection()} + * call). + * + * @return {@code true} if there is an active transaction on the primary connection + */ + public synchronized boolean hasActiveTransaction() { + if (primaryConnection == null) { + log.debug("[RW-SPLIT] hasActiveTransaction: primaryConnection is null, returning false for session={}", sessionUUID); + return false; + } + try { + boolean active = !primaryConnection.getAutoCommit(); + log.debug("[RW-SPLIT] hasActiveTransaction: session={}, autoCommit={}, hasActiveTransaction={}", + sessionUUID, !active, active); + return active; + } catch (SQLException e) { + // Safety: assume transaction present if we cannot determine + log.warn("Could not determine autoCommit state for session {}; assuming active transaction", sessionUUID); + return true; + } } public SessionInfo getSessionInfo() { @@ -270,9 +417,22 @@ public void terminate() throws SQLException { } catch (SQLException e) { log.error("Error closing XA connection", e); } - } else if (connection != null) { - // For regular connections, close normally - this.connection.close(); + } else { + // Non-XA: close replica connection first (if acquired), then primary (if acquired) + if (replicaConnection != null) { + try { + replicaConnection.close(); + } catch (SQLException e) { + log.error("Error closing replica connection for session {}", sessionUUID, e); + } + } + if (primaryConnection != null) { + try { + primaryConnection.close(); + } catch (SQLException e) { + log.error("Error closing primary connection for session {}", sessionUUID, e); + } + } } //Clear session internal objects to free memory @@ -281,7 +441,8 @@ public void terminate() throws SQLException { this.resultSetMap = null; this.statementMap = null; this.preparedStatementMap = null; - this.connection = null; + this.primaryConnection = null; + this.replicaConnection = null; this.xaConnection = null; this.xaResource = null; this.backendSession = null; diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java index 121c79c40..15196f325 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManager.java @@ -2,6 +2,7 @@ import com.openjproxy.grpc.SessionInfo; +import javax.sql.DataSource; import javax.sql.XAConnection; import java.sql.CallableStatement; import java.sql.Connection; @@ -17,6 +18,32 @@ public interface SessionManager { void registerClientUUID(String connectionHash, String clientUUID); SessionInfo createSession(String clientUUID, Connection connection); + SessionInfo createSession(String clientUUID, String connHash, Connection connection); + /** + * Creates a lazy dual-datasource session. No JDBC connections are acquired + * at creation time; they are obtained on demand when + * {@link Session#getConnection()} or {@link Session#getOrCreateReplicaConnection()} + * is first called. + * + * @param clientUUID the client identifier + * @param primaryDataSource datasource for the primary database + * @param replicaDataSource datasource for the read replica; {@code null} when no replica is configured + * @return the new session info + */ + SessionInfo createSession(String clientUUID, DataSource primaryDataSource, DataSource replicaDataSource); + /** + * Creates a lazy dual-datasource session with an explicitly supplied connection hash. + * Use this overload when the caller already holds the primary's {@code connHash} + * (e.g. from the driver's request) to avoid relying on the potentially stale + * {@code connectionHashMap} lookup. + * + * @param clientUUID the client identifier + * @param connHash the primary's connection hash (from the driver request) + * @param primaryDataSource datasource for the primary database + * @param replicaDataSource datasource for the read replica; {@code null} when no replica is configured + * @return the new session info + */ + SessionInfo createSession(String clientUUID, String connHash, DataSource primaryDataSource, DataSource replicaDataSource); SessionInfo createXASession(String clientUUID, Connection connection, XAConnection xaConnection); SessionInfo createDeferredXASession(String clientUUID, String connectionHash); Session getSession(SessionInfo sessionInfo); diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java index eac2b9ed4..5c6340284 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/SessionManagerImpl.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.openjproxy.grpc.server.cache.CacheConfiguration; +import javax.sql.DataSource; import javax.sql.XAConnection; import java.sql.CallableStatement; import java.sql.Connection; @@ -60,6 +61,37 @@ public SessionInfo createSession(String clientUUID, Connection connection) { return session.getSessionInfo(); } + @Override + public SessionInfo createSession(String clientUUID, String connHash, Connection connection) { + log.info("Create session for client uuid {} with connHash {}", clientUUID, connHash); + CacheConfiguration cacheConfig = getCacheConfiguration(connHash); + Session session = new Session(connection, connHash, clientUUID, false, null, cacheConfig); + log.info("Session {} created for client uuid {}", session.getSessionUUID(), clientUUID); + this.sessionMap.put(session.getSessionUUID(), session); + return session.getSessionInfo(); + } + + @Override + public SessionInfo createSession(String clientUUID, DataSource primaryDataSource, DataSource replicaDataSource) { + log.info("Create lazy dual-datasource session for client uuid {}", clientUUID); + String connectionHash = connectionHashMap.get(clientUUID); + CacheConfiguration cacheConfig = getCacheConfiguration(connectionHash); + Session session = new Session(primaryDataSource, replicaDataSource, connectionHash, clientUUID, cacheConfig); + log.info("Lazy session {} created for client uuid {}", session.getSessionUUID(), clientUUID); + this.sessionMap.put(session.getSessionUUID(), session); + return session.getSessionInfo(); + } + + @Override + public SessionInfo createSession(String clientUUID, String connHash, DataSource primaryDataSource, DataSource replicaDataSource) { + log.info("Create lazy dual-datasource session for client uuid {} with connHash {}", clientUUID, connHash); + CacheConfiguration cacheConfig = getCacheConfiguration(connHash); + Session session = new Session(primaryDataSource, replicaDataSource, connHash, clientUUID, cacheConfig); + log.info("Lazy session {} created for client uuid {}", session.getSessionUUID(), clientUUID); + this.sessionMap.put(session.getSessionUUID(), session); + return session.getSessionInfo(); + } + @Override public SessionInfo createXASession(String clientUUID, Connection connection, XAConnection xaConnection) { log.info("Create XA session for client uuid " + clientUUID); diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/StatementServiceImpl.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/StatementServiceImpl.java index 1c2899a93..c847de0d6 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/StatementServiceImpl.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/StatementServiceImpl.java @@ -85,6 +85,10 @@ public StatementServiceImpl(SessionManager sessionManager, CircuitBreakerRegistr // Per-datasource cache configurations (shared with SessionManager) Map cacheCfgMap = cacheConfigurationMap != null ? cacheConfigurationMap : new ConcurrentHashMap<>(); + // Read/write splitting datasource registry + org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry readWriteRegistry = + new org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry(); + this.actionContext = new org.openjproxy.grpc.server.action.ActionContext( datasourceMap, @@ -94,6 +98,7 @@ public StatementServiceImpl(SessionManager sessionManager, CircuitBreakerRegistr dbNameMap, slowQuerySegregationManagers, cacheCfgMap, + readWriteRegistry, xaPoolProvider, XA_COORDINATOR, clusterHealthTracker, diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/ActionContext.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/ActionContext.java index 00eab51df..fc240c974 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/ActionContext.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/ActionContext.java @@ -84,6 +84,15 @@ public class ActionContext { */ private final Map cacheConfigurationMap; + // ========== Read/Write Splitting ========== + + /** + * Registry for managing primary and replica datasources for read/write splitting. + * Thread-safe, shared across all actions. + */ + private final org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry readWriteDataSourceRegistry; + + // ========== XA Pool Provider ========== /** @@ -148,6 +157,7 @@ public ActionContext( Map dbNameMap, Map slowQuerySegregationManagers, Map cacheConfigurationMap, + org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry readWriteDataSourceRegistry, XAConnectionPoolProvider xaPoolProvider, MultinodeXaCoordinator xaCoordinator, ClusterHealthTracker clusterHealthTracker, @@ -163,6 +173,7 @@ public ActionContext( this.dbNameMap = dbNameMap; this.slowQuerySegregationManagers = slowQuerySegregationManagers; this.cacheConfigurationMap = cacheConfigurationMap; + this.readWriteDataSourceRegistry = readWriteDataSourceRegistry; this.xaPoolProvider = xaPoolProvider; this.xaCoordinator = xaCoordinator; this.clusterHealthTracker = clusterHealthTracker; @@ -202,6 +213,10 @@ public Map getSlowQuerySegregationManagers( public Map getCacheConfigurationMap() { return cacheConfigurationMap; } + public org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry getReadWriteDataSourceRegistry() { + return readWriteDataSourceRegistry; + } + public XAConnectionPoolProvider getXaPoolProvider() { return xaPoolProvider; diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java index cb19476af..ec8ac640d 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/connection/ConnectAction.java @@ -17,6 +17,9 @@ import org.openjproxy.grpc.server.action.util.ProcessClusterHealthAction; import org.openjproxy.grpc.server.pool.ConnectionPoolConfigurer; import org.openjproxy.grpc.server.pool.DataSourceConfigurationManager; +import org.openjproxy.grpc.server.readwrite.ReadWriteConfiguration; +import org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceManager; + import org.openjproxy.grpc.server.utils.ConnectionHashGenerator; import org.openjproxy.grpc.server.utils.UrlParser; @@ -227,6 +230,9 @@ private void handleRegularConnection(ActionContext context, ConnectionDetails co dsConfig.getDataSourceName(), connHash, ConnectionPoolProviderRegistry.getDefaultProvider().map(p -> p.id()).orElse("unknown"), maxPoolSize, minIdle); + + // Setup read/write splitting if configured + setupReadWriteSplitting(context, connectionDetails, connHash, ds, dsConfig.getDataSourceName()); } } catch (Exception e) { @@ -240,6 +246,30 @@ private void handleRegularConnection(ActionContext context, ConnectionDetails co lock.unlock(); } + // If the pool already existed and read/write splitting was not yet registered for this + // connHash, attempt setup now. The setupReadWriteSplitting implementation is idempotent: + // it skips silently when the primary is already mapped and replicas are registered. + org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry readWriteRegistry = + context.getReadWriteDataSourceRegistry(); + if (readWriteRegistry != null + && readWriteRegistry.getPrimaryName(connHash) == null + && connectionDetails.getPropertiesCount() > 0) { + DataSource existingDs = context.getDatasourceMap().get(connHash); + if (existingDs != null) { + try { + Properties clientProps = ConnectionPoolConfigurer.extractClientProperties(connectionDetails); + DataSourceConfigurationManager.DataSourceConfiguration dsConf = + DataSourceConfigurationManager.getConfiguration(clientProps); + setupReadWriteSplitting(context, connectionDetails, connHash, existingDs, + dsConf.getDataSourceName()); + } catch (Exception e) { + log.error("Failed to setup read/write splitting for connHash {} (pool already existed): {}", + connHash, e.getMessage(), e); + // Non-fatal: continue without read/write splitting + } + } + } + // Process cluster health from ConnectionDetails if provided. // This supports the driver's proactive cluster health push: after detecting a peer // server failure or recovery, the driver calls connect() on healthy servers with an @@ -318,4 +348,38 @@ private void handleRegularConnection(ActionContext context, ConnectionDetails co responseObserver.onCompleted(); } + + /** + * Sets up read/write splitting for the given datasource if configured. + * Creates and registers replica datasources in the ReadWriteDataSourceRegistry. + * + * @param context action context + * @param connectionDetails connection details with properties + * @param connHash connection hash for the primary + * @param ds primary datasource (already created) + * @param datasourceName name of the datasource + */ + private void setupReadWriteSplitting(ActionContext context, ConnectionDetails connectionDetails, + String connHash, DataSource ds, String datasourceName) { + if (context.getReadWriteDataSourceRegistry() == null) { + log.debug("ReadWriteDataSourceRegistry not available, skipping read/write splitting setup"); + return; + } + + try { + ReadWriteDataSourceManager rwManager = new ReadWriteDataSourceManager( + context.getReadWriteDataSourceRegistry()); + + ReadWriteConfiguration config = rwManager.setupReadWriteSplitting( + connectionDetails, connHash, ds, datasourceName); + + if (config != null) { + log.info("Read/write splitting successfully configured for datasource '{}'", datasourceName); + } + } catch (Exception e) { + log.error("Failed to setup read/write splitting for datasource '{}': {}", + datasourceName, e.getMessage(), e); + // Non-fatal: continue without read/write splitting + } + } } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/streaming/SessionConnectionHelper.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/streaming/SessionConnectionHelper.java index c43c535e1..a1cda83a7 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/streaming/SessionConnectionHelper.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/streaming/SessionConnectionHelper.java @@ -91,20 +91,50 @@ private SessionConnectionHelper() { public static ConnectionSessionDTO sessionConnection(ActionContext context, SessionInfo sessionInfo, boolean startSessionIfNone) throws SQLException { + return sessionConnection(context, sessionInfo, startSessionIfNone, null); + } + + /** + * Same as {@link #sessionConnection(ActionContext, SessionInfo, boolean)} but + * allows the caller to supply a {@code replicaDataSource} that overrides the + * pooled-mode datasource lookup. When {@code replicaDataSource} is non-{@code null} + * and the session has no existing UUID (i.e. stateless / auto-commit mode), the + * connection is acquired from {@code replicaDataSource} instead of the primary pool. + * XA, unpooled, and existing-session paths are not affected by this parameter. + * + * @param context the action context containing the session manager + * @param sessionInfo current sessionInfo object + * @param startSessionIfNone if {@code true} a new session will be started when none exists + * @param replicaDataSource optional datasource to use in place of the primary pool; + * {@code null} means "use the primary pool as normal" + * @return ConnectionSessionDTO + * @throws SQLException if a connection cannot be obtained + */ + public static ConnectionSessionDTO sessionConnection(ActionContext context, SessionInfo sessionInfo, + boolean startSessionIfNone, + javax.sql.DataSource replicaDataSource) + throws SQLException { ConnectionSessionDTO.ConnectionSessionDTOBuilder dtoBuilder = ConnectionSessionDTO.builder(); dtoBuilder.session(sessionInfo); - Connection conn; + Connection conn = null; var sessionManager = context.getSessionManager(); if (StringUtils.isNotEmpty(sessionInfo.getSessionUUID())) { - // Session already exists, reuse its connection - conn = sessionManager.getConnection(sessionInfo); - if (conn == null) { - throw new SQLException("Connection not found for this sessionInfo"); - } - dtoBuilder.dbName(DatabaseUtils.resolveDbName(conn.getMetaData().getURL())); - if (conn.isClosed()) { - throw new SQLException("Connection is closed"); + if (replicaDataSource != null) { + // Replica-routed request on an existing session: do not trigger lazy primary + // acquisition here. ExecuteQueryAction will call + // session.getOrCreateReplicaConnection() to get the replica connection. + // conn remains null — caller is responsible for obtaining the right connection. + } else { + // Primary path: get the connection from the existing session (may lazily acquire) + conn = sessionManager.getConnection(sessionInfo); + if (conn == null) { + throw new SQLException("Connection not found for this sessionInfo"); + } + dtoBuilder.dbName(DatabaseUtils.resolveDbName(conn.getMetaData().getURL())); + if (conn.isClosed()) { + throw new SQLException("Connection is closed"); + } } } else { // Lazy allocation: check if this is an XA or regular connection @@ -160,31 +190,49 @@ public static ConnectionSessionDTO sessionConnection(ActionContext context, Sess throw e; } } else { - // Pooled mode: acquire from datasource (HikariCP by default) - DataSource dataSource = context.getDatasourceMap().get(connHash); - if (dataSource == null) { + // Pooled mode: create a lazy dual-datasource session when a replica is + // provided so that the primary connection is only acquired if actually + // needed (e.g. for a subsequent write on the same session). + DataSource primaryDataSource = context.getDatasourceMap().get(connHash); + + if (primaryDataSource == null) { // Signal the client to reconnect. NOT_FOUND is caught by // CommandExecutionHelper and translated to Status.NOT_FOUND so that the // driver can transparently reconnect and retry the SQL call. throw new PoolNotFoundException(connHash); } - try { - // Use enhanced connection acquisition with timeout protection - conn = ConnectionAcquisitionManager.acquireConnection(dataSource, connHash); - log.debug("Successfully acquired connection from pool for hash: {}", connHash); - } catch (SQLException e) { - log.error("Failed to acquire connection from pool for hash: {}. Error: {}", - connHash, e.getMessage()); + if (replicaDataSource != null) { + // Replica path: create a lazy session with both datasources. + // No connection is acquired here; the caller (ExecuteQueryAction) will + // obtain the replica connection via session.getOrCreateReplicaConnection(). + if (startSessionIfNone) { + SessionInfo updatedSession = sessionManager.createSession( + sessionInfo.getClientUUID(), sessionInfo.getConnHash(), + primaryDataSource, replicaDataSource); + dtoBuilder.session(updatedSession); + } + // conn remains null — caller must resolve the connection from the session + } else { + // Primary path: eager acquisition as before + try { + // Use enhanced connection acquisition with timeout protection + conn = ConnectionAcquisitionManager.acquireConnection(primaryDataSource, connHash); + log.debug("Successfully acquired connection from pool for hash: {}", connHash); + } catch (SQLException e) { + log.error("Failed to acquire connection from pool for hash: {}. Error: {}", + connHash, e.getMessage()); - // Re-throw the enhanced exception from ConnectionAcquisitionManager - throw e; - } - } + // Re-throw the enhanced exception from ConnectionAcquisitionManager + throw e; + } - if (startSessionIfNone) { - SessionInfo updatedSession = sessionManager.createSession(sessionInfo.getClientUUID(), conn); - dtoBuilder.session(updatedSession); + if (startSessionIfNone) { + SessionInfo updatedSession = sessionManager.createSession( + sessionInfo.getClientUUID(), sessionInfo.getConnHash(), conn); + dtoBuilder.session(updatedSession); + } + } } } } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteQueryAction.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteQueryAction.java index f746d0f37..2462bd50e 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteQueryAction.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteQueryAction.java @@ -5,13 +5,18 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.openjproxy.grpc.ProtoConverter; import org.openjproxy.grpc.dto.Parameter; import org.openjproxy.grpc.server.ConnectionSessionDTO; +import org.openjproxy.grpc.server.Session; import org.openjproxy.grpc.server.action.Action; import org.openjproxy.grpc.server.action.ActionContext; import org.openjproxy.grpc.server.cache.CacheConfiguration; import org.openjproxy.grpc.server.cache.QueryCacheHelper; +import org.openjproxy.grpc.server.readwrite.ReadReplicaSelector; +import org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry; +import org.openjproxy.grpc.server.readwrite.ReadWriteSqlClassifier; import org.openjproxy.grpc.server.statement.StatementFactory; import javax.sql.DataSource; @@ -30,6 +35,8 @@ public class ExecuteQueryAction implements Action { private static final ExecuteQueryAction INSTANCE = new ExecuteQueryAction(); + private static final ReadReplicaSelector REPLICA_SELECTOR = new ReadReplicaSelector(); + /** * Private constructor for singleton. */ @@ -60,7 +67,41 @@ public void execute(ActionContext context, StatementRequest request, StreamObser private void executeQueryInternal(ActionContext actionContext, StatementRequest request, StreamObserver responseObserver) throws SQLException { - ConnectionSessionDTO dto = sessionConnection(actionContext, request.getSession(), true); + // Read/write splitting: route reads to a replica when applicable + DataSource replicaDs = resolveReadReplicaDataSource(actionContext, request); + + // Always start a session (startSessionIfNone=true) so the session UUID is returned + // to the driver — the client may need it for metadata access on the result set, + // statement, or connection. For the replica path, a lazy dual-datasource session is + // created; the actual replica connection is resolved below via the Session object. + ConnectionSessionDTO dto = sessionConnection(actionContext, request.getSession(), true, replicaDs); + + // Resolve the JDBC connection to use for execution: + // - replica path → session.getOrCreateReplicaConnection() (lazy; never allocates primary) + // - primary path → dto.getConnection() (already acquired by sessionConnection) + Connection execConn; + if (replicaDs != null) { + Session activeSession = actionContext.getSessionManager().getSession(dto.getSession()); + if (activeSession == null) { + throw new SQLException("Session not found for UUID: " + dto.getSession().getSessionUUID() + + ". Cannot obtain replica connection."); + } + execConn = activeSession.getOrCreateReplicaConnection(replicaDs); + log.debug("[RW-SPLIT] executeQueryInternal: using REPLICA connection for sessionUUID={}, connHash={}", + dto.getSession().getSessionUUID(), request.getSession().getConnHash()); + } else { + execConn = dto.getConnection(); + log.debug("[RW-SPLIT] executeQueryInternal: using PRIMARY connection for sessionUUID={}, connHash={}", + dto.getSession().getSessionUUID(), request.getSession().getConnHash()); + } + + // Build an execution DTO with the resolved connection so StatementFactory and + // downstream helpers receive the correct connection regardless of routing. + ConnectionSessionDTO execDto = ConnectionSessionDTO.builder() + .session(dto.getSession()) + .connection(execConn) + .dbName(dto.getDbName()) + .build(); // Phase 6: Cache Lookup (before query execution) - with graceful degradation String sql = request.getSql(); @@ -108,8 +149,8 @@ private void executeQueryInternal(ActionContext actionContext, StatementRequest DataSource dataSource = datasourceMap.get(dsKey); if (dataSource != null) { - // Get catalog and schema from the connection - Connection connection = dto.getConnection(); + // Get catalog and schema from the execution connection + Connection connection = execConn; String catalogName = connection.getCatalog(); String schemaName = connection.getSchema(); @@ -152,14 +193,97 @@ private void executeQueryInternal(ActionContext actionContext, StatementRequest responseObserver, cacheConfig, sql, params, dto.getSession().getConnHash()); if (CollectionUtils.isNotEmpty(params)) { - PreparedStatement ps = StatementFactory.createPreparedStatement(sessionManager, dto, sql, params, request); + PreparedStatement ps = StatementFactory.createPreparedStatement(sessionManager, execDto, sql, params, request); String resultSetUUID = sessionManager.registerResultSet(dto.getSession(), ps.executeQuery()); handleResultSet(actionContext, dto.getSession(), resultSetUUID, finalObserver); } else { - Statement stmt = StatementFactory.createStatement(sessionManager, dto.getConnection(), request); + Statement stmt = StatementFactory.createStatement(sessionManager, execDto.getConnection(), request); String resultSetUUID = sessionManager.registerResultSet(dto.getSession(), stmt.executeQuery(sql)); handleResultSet(actionContext, dto.getSession(), resultSetUUID, finalObserver); } } + + /** + * Determines whether this read query should be routed to a replica, and if so + * returns the selected replica {@link DataSource}. + * + *

Routing to a replica is skipped when any of the following is true: + *

+ * + *

Note: the presence of a {@code sessionUUID} alone does not block + * replica routing. The session UUID may exist from a previous autoCommit SELECT + * on the same connection, and in that case routing to the replica is still safe. + * + * @param context the action context + * @param request the statement request + * @return a replica {@link DataSource}, or {@code null} when the primary should be used + */ + private DataSource resolveReadReplicaDataSource(ActionContext context, StatementRequest request) { + ReadWriteDataSourceRegistry registry = context.getReadWriteDataSourceRegistry(); + if (registry == null) { + return null; + } + + String sessionUUID = request.getSession().getSessionUUID(); + String connHash = request.getSession().getConnHash(); + log.debug("[RW-SPLIT] resolveReadReplicaDataSource: connHash={}, sessionUUID={}, sql={}", + connHash, sessionUUID, + request.getSql().length() > 60 ? request.getSql().substring(0, 60) + "..." : request.getSql()); + + // Block replica routing only when there is an active transaction on the primary. + // A session UUID being present is not sufficient — it may come from a previous + // autoCommit SELECT. We check the actual transaction state via hasActiveTransaction() + // which does NOT trigger lazy primary connection acquisition. + if (StringUtils.isNotBlank(sessionUUID)) { + Session existingSession = context.getSessionManager().getSession(request.getSession()); + if (existingSession == null) { + // Session has expired or been invalidated; fall back to primary to avoid + // routing to replica with unknown session state. + log.debug("[RW-SPLIT] session not found for UUID={}, routing to primary", sessionUUID); + return null; + } + if (existingSession.hasActiveTransaction()) { + log.debug("[RW-SPLIT] active transaction on session={}, routing to primary", sessionUUID); + return null; // active transaction → must stay on primary + } + } + + // Only route read-only SQL to replicas + ReadWriteSqlClassifier.QueryType queryType = ReadWriteSqlClassifier.classify(request.getSql()); + if (queryType != ReadWriteSqlClassifier.QueryType.READ) { + log.debug("[RW-SPLIT] SQL classified as {}, routing to primary (connHash={})", queryType, connHash); + return null; + } + + String primaryName = registry.getPrimaryName(connHash); + if (primaryName == null) { + log.debug("[RW-SPLIT] no primary mapping for connHash={}, routing to primary", connHash); + return null; + } + + // Honour sticky session: after a write, reads go to primary until timeout expires + if (registry.isStickyActive(primaryName)) { + log.debug("Read/write splitting: sticky session active for primary '{}', routing to primary", primaryName); + return null; + } + + List replicas = registry.getReplicas(primaryName); + if (replicas.isEmpty()) { + log.debug("[RW-SPLIT] no replicas registered for primary='{}', routing to primary", primaryName); + return null; + } + + DataSource selected = REPLICA_SELECTOR.select(primaryName, replicas, registry.getStrategy(primaryName)); + log.debug("[RW-SPLIT] routed READ to replica for primary='{}', connHash={}, sessionUUID={}", + primaryName, connHash, sessionUUID); + return selected; + } } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteUpdateAction.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteUpdateAction.java index 9fc6472d8..d8b9be671 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteUpdateAction.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/ExecuteUpdateAction.java @@ -148,6 +148,10 @@ private OpResult executeUpdateInternal(ActionContext actionContext, StatementReq // Phase 9: Cache Invalidation (after successful update) org.openjproxy.grpc.server.cache.QueryCacheHelper.invalidateCacheIfEnabled(actionContext, dto.getSession(), request.getSql()); + // Read/write splitting: mark write for sticky session (routes subsequent reads to primary) + markStickySessionAfterWrite(actionContext, dto); + + return result; } finally { closeStatementAndConnectionIfNoSession(dto, stmt); @@ -277,4 +281,25 @@ private void closeStatementAndConnectionIfNoSession(ConnectionSessionDTO dto, St } } } + + /** + * Marks a write on the sticky-session tracker so that subsequent reads within the + * configured window are routed to the primary (read-your-writes guarantee). + * This is a no-op when read/write splitting is not configured. + * + * @param actionContext the action context + * @param dto the connection and session DTO used for the write + */ + private void markStickySessionAfterWrite(ActionContext actionContext, ConnectionSessionDTO dto) { + var registry = actionContext.getReadWriteDataSourceRegistry(); + if (registry == null || dto.getSession() == null) { + return; + } + String connHash = dto.getSession().getConnHash(); + String primaryName = registry.getPrimaryName(connHash); + if (primaryName != null) { + registry.markWrite(primaryName); + log.debug("Read/write splitting: sticky session marked for primary '{}' after write", primaryName); + } + } } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/StartTransactionAction.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/StartTransactionAction.java index e4c665c92..89973539d 100644 --- a/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/StartTransactionAction.java +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/action/transaction/StartTransactionAction.java @@ -75,7 +75,7 @@ public void execute(ActionContext context, SessionInfo sessionInfo, StreamObserv // Start a session if none started yet. if (StringUtils.isEmpty(sessionInfo.getSessionUUID())) { Connection conn = context.getDatasourceMap().get(sessionInfo.getConnHash()).getConnection(); - activeSessionInfo = sessionManager.createSession(sessionInfo.getClientUUID(), conn); + activeSessionInfo = sessionManager.createSession(sessionInfo.getClientUUID(), sessionInfo.getConnHash(), conn); // Preserve targetServer from incoming request activeSessionInfo = SessionInfoUtils.withTargetServer(activeSessionInfo, getTargetServer(sessionInfo)); } diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelector.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelector.java new file mode 100644 index 000000000..2cfcce35b --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelector.java @@ -0,0 +1,62 @@ +package org.openjproxy.grpc.server.readwrite; + +import javax.sql.DataSource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Selects a replica {@link DataSource} from the list of registered replicas + * using the configured {@link ReadWriteConfiguration.ReplicaSelectionStrategy}. + * + *

Supported strategies: + *

+ * + *

Thread Safety: this class is thread-safe. Round-robin counters use + * {@link AtomicInteger} and are stored in a {@link ConcurrentHashMap}. + */ +public class ReadReplicaSelector { + + private final Map roundRobinCounters = new ConcurrentHashMap<>(); + + /** + * Selects a replica {@link DataSource} using the given strategy. + * + * @param primaryName the name of the primary datasource (used as a key for + * per-primary counters) + * @param replicas the list of available replica datasources; must not be + * {@code null} + * @param strategy the replica selection strategy + * @return the selected {@link DataSource}, or {@code null} if + * {@code replicas} is empty + */ + public DataSource select(String primaryName, List replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy strategy) { + if (replicas.isEmpty()) { + return null; + } + return switch (strategy) { + case RANDOM -> replicas.get(ThreadLocalRandom.current().nextInt(replicas.size())); + // LEAST_CONNECTIONS falls back to ROUND_ROBIN until pool metrics are available + default -> roundRobin(primaryName, replicas); + }; + } + + private DataSource roundRobin(String primaryName, List replicas) { + AtomicInteger counter = roundRobinCounters + .computeIfAbsent(primaryName, k -> new AtomicInteger(0)); + // Use bitwise AND to avoid negative index when counter wraps past Integer.MAX_VALUE + int idx = (counter.getAndIncrement() & Integer.MAX_VALUE) % replicas.size(); + return replicas.get(idx); + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfiguration.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfiguration.java new file mode 100644 index 000000000..88b5b667e --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfiguration.java @@ -0,0 +1,177 @@ +package org.openjproxy.grpc.server.readwrite; + +import lombok.Getter; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Configuration for read/write splitting feature. + * Holds settings for a primary datasource and its associated read replicas. + */ +@Getter +@ToString +public class ReadWriteConfiguration { + + /** + * Name of the primary datasource + */ + private final String primaryName; + + /** + * Whether read/write splitting is enabled for this primary + */ + private final boolean enabled; + + /** + * Replica selection strategy (ROUND_ROBIN, RANDOM, LEAST_CONNECTIONS) + */ + private final ReplicaSelectionStrategy strategy; + + /** + * Duration in seconds to stick to primary after a write operation. + * This ensures read-your-writes consistency. + */ + private final int stickySessionSeconds; + + /** + * Whether to fail over to primary when all replicas are unavailable + */ + private final boolean failoverToPrimary; + + /** + * List of replica datasource names associated with this primary + */ + private final List replicaNames; + + /** + * Replica selection strategies + */ + public enum ReplicaSelectionStrategy { + ROUND_ROBIN, + RANDOM, + LEAST_CONNECTIONS + } + + /** + * Creates a new ReadWriteConfiguration + * + * @param primaryName name of the primary datasource + * @param enabled whether read/write splitting is enabled + * @param strategy replica selection strategy + * @param stickySessionSeconds sticky session duration in seconds + * @param failoverToPrimary whether to failover to primary when replicas unavailable + * @param replicaNames list of replica datasource names + */ + public ReadWriteConfiguration(String primaryName, boolean enabled, ReplicaSelectionStrategy strategy, + int stickySessionSeconds, boolean failoverToPrimary, List replicaNames) { + this.primaryName = Objects.requireNonNull(primaryName, "primaryName cannot be null"); + this.enabled = enabled; + this.strategy = Objects.requireNonNull(strategy, "strategy cannot be null"); + this.stickySessionSeconds = stickySessionSeconds; + this.failoverToPrimary = failoverToPrimary; + this.replicaNames = Collections.unmodifiableList(new ArrayList<>( + Objects.requireNonNull(replicaNames, "replicaNames cannot be null"))); + } + + /** + * Validates this configuration + * + * @throws IllegalArgumentException if configuration is invalid + */ + public void validate() { + if (primaryName == null || primaryName.trim().isEmpty()) { + throw new IllegalArgumentException("Primary datasource name cannot be empty"); + } + + if (enabled && replicaNames.isEmpty()) { + throw new IllegalArgumentException( + "Read/write splitting is enabled but no replicas configured for primary: " + primaryName); + } + + // stickySessionSeconds is optional - 0 means disabled, positive values enable sticky sessions + if (stickySessionSeconds < 0) { + throw new IllegalArgumentException("stickySessionSeconds cannot be negative: " + stickySessionSeconds); + } + + // Check for duplicate replica names + if (replicaNames.size() != replicaNames.stream().distinct().count()) { + throw new IllegalArgumentException("Duplicate replica names found for primary: " + primaryName); + } + } + + /** + * Checks if read/write splitting has replicas configured + * + * @return true if enabled and has replicas + */ + public boolean hasReplicas() { + return enabled && !replicaNames.isEmpty(); + } + + /** + * Gets the number of configured replicas + * + * @return replica count + */ + public int getReplicaCount() { + return replicaNames.size(); + } + + /** + * Builder for ReadWriteConfiguration + */ + public static class Builder { + private String primaryName; + private boolean enabled = false; + private ReplicaSelectionStrategy strategy = ReplicaSelectionStrategy.ROUND_ROBIN; + private int stickySessionSeconds = 5; + private boolean failoverToPrimary = true; + private List replicaNames = new ArrayList<>(); + + public Builder primaryName(String primaryName) { + this.primaryName = primaryName; + return this; + } + + public Builder enabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + public Builder strategy(ReplicaSelectionStrategy strategy) { + this.strategy = strategy; + return this; + } + + public Builder stickySessionSeconds(int stickySessionSeconds) { + this.stickySessionSeconds = stickySessionSeconds; + return this; + } + + public Builder failoverToPrimary(boolean failoverToPrimary) { + this.failoverToPrimary = failoverToPrimary; + return this; + } + + public Builder addReplica(String replicaName) { + this.replicaNames.add(replicaName); + return this; + } + + public Builder replicas(List replicaNames) { + this.replicaNames = new ArrayList<>(replicaNames); + return this; + } + + public ReadWriteConfiguration build() { + ReadWriteConfiguration config = new ReadWriteConfiguration( + primaryName, enabled, strategy, stickySessionSeconds, failoverToPrimary, replicaNames); + config.validate(); + return config; + } + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParser.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParser.java new file mode 100644 index 000000000..c0425f1cf --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParser.java @@ -0,0 +1,257 @@ +package org.openjproxy.grpc.server.readwrite; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Parses read/write splitting configuration from Properties. + * + *

Configuration format: + *

+ * # Primary configuration
+ * primary.ojp.readwrite.enabled=true
+ * primary.ojp.readwrite.role=primary
+ * primary.ojp.readwrite.replicaSelectionStrategy=ROUND_ROBIN
+ * primary.ojp.readwrite.stickySessionSeconds=5
+ * primary.ojp.readwrite.replicaFailoverToPrimary=true
+ *
+ * # Replica configuration
+ * replica1.ojp.readwrite.role=replica
+ * replica1.ojp.readwrite.primary=primary
+ * 
+ * + *

All read/write configuration is supplied by the JDBC client via {@link java.util.Properties} + * passed to {@link java.sql.DriverManager#getConnection(String, Properties)} and forwarded to + * the OJP server as gRPC metadata. No server-side properties file is required. + */ +@Slf4j +public class ReadWriteConfigurationParser { + + // Private constructor to prevent instantiation of utility class + private ReadWriteConfigurationParser() { + throw new UnsupportedOperationException("Utility class - do not instantiate"); + } + + private static final String READWRITE_PREFIX = ".ojp.readwrite."; + private static final String ENABLED_SUFFIX = "enabled"; + private static final String ROLE_SUFFIX = "role"; + private static final String PRIMARY_SUFFIX = "primary"; + private static final String STRATEGY_SUFFIX = "replicaSelectionStrategy"; + private static final String STICKY_SESSION_SUFFIX = "stickySessionSeconds"; + private static final String FAILOVER_SUFFIX = "replicaFailoverToPrimary"; + + private static final String ROLE_PRIMARY = "primary"; + private static final String ROLE_REPLICA = "replica"; + + // Cache for parsed configurations + private static final ConcurrentMap CONFIG_CACHE = new ConcurrentHashMap<>(); + + /** + * Parses read/write configuration for all datasources from properties. + * + * @param properties configuration properties + * @return map of primary datasource name to ReadWriteConfiguration + * @throws IllegalArgumentException if configuration is invalid + */ + public static Map parseAll(Properties properties) { + Map configs = new HashMap<>(); + + // First pass: find all primaries + Set primaries = findPrimaries(properties); + + // Second pass: build configuration for each primary + for (String primaryName : primaries) { + ReadWriteConfiguration config = parseForPrimary(primaryName, properties); + configs.put(primaryName, config); + + log.info("Parsed read/write configuration for primary '{}': {}", primaryName, config); + } + + return configs; + } + + /** + * Parses read/write configuration for a specific primary datasource. + * Uses cache to avoid reparsing. + * + * @param primaryName name of the primary datasource + * @param properties configuration properties + * @return ReadWriteConfiguration for this primary, or null if not configured + */ + public static ReadWriteConfiguration parseForPrimary(String primaryName, Properties properties) { + // Check cache first + ReadWriteConfiguration cached = CONFIG_CACHE.get(primaryName); + if (cached != null) { + return cached; + } + + // Parse configuration + String prefix = primaryName + READWRITE_PREFIX; + + // Check if read/write splitting is enabled for this primary + boolean enabled = getBooleanProperty(properties, prefix + ENABLED_SUFFIX, false); + + // Verify role is explicitly set to "primary" + String role = getStringProperty(properties, prefix + ROLE_SUFFIX, ""); + if (!role.isEmpty() && !ROLE_PRIMARY.equals(role)) { + log.warn("Datasource '{}' has readwrite.role='{}' but is not 'primary', skipping read/write config", + primaryName, role); + return null; + } + + // Parse strategy + String strategyStr = getStringProperty(properties, prefix + STRATEGY_SUFFIX, "ROUND_ROBIN"); + ReadWriteConfiguration.ReplicaSelectionStrategy strategy; + try { + strategy = ReadWriteConfiguration.ReplicaSelectionStrategy.valueOf(strategyStr.toUpperCase()); + } catch (IllegalArgumentException e) { + log.warn("Invalid replicaSelectionStrategy '{}' for primary '{}', using ROUND_ROBIN", + strategyStr, primaryName); + strategy = ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN; + } + + // Parse other settings — default is 0 (disabled); sticky sessions are opt-in + int stickySessionSeconds = getIntProperty(properties, prefix + STICKY_SESSION_SUFFIX, 0); + boolean failoverToPrimary = getBooleanProperty(properties, prefix + FAILOVER_SUFFIX, true); + + // Find all replicas for this primary + List replicaNames = findReplicasForPrimary(primaryName, properties); + + // Build configuration + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName(primaryName) + .enabled(enabled) + .strategy(strategy) + .stickySessionSeconds(stickySessionSeconds) + .failoverToPrimary(failoverToPrimary) + .replicas(replicaNames) + .build(); + + // Only cache when read/write splitting is actually active (enabled with at least one replica). + // A disabled or replica-less config is NOT cached so that the next connection attempt that + // carries the full properties can re-evaluate and set up splitting correctly. + if (config.isEnabled() && !config.getReplicaNames().isEmpty()) { + CONFIG_CACHE.put(primaryName, config); + } + + return config; + } + + /** + * Finds all datasources configured as primaries + */ + private static Set findPrimaries(Properties properties) { + Set primaries = new HashSet<>(); + + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.contains(READWRITE_PREFIX + ROLE_SUFFIX)) { + String role = properties.getProperty(propertyName); + if (ROLE_PRIMARY.equals(role)) { + // Extract datasource name (everything before .ojp.readwrite.role) + String datasourceName = propertyName.substring(0, propertyName.indexOf(READWRITE_PREFIX)); + primaries.add(datasourceName); + } + } + } + + return primaries; + } + + /** + * Finds all replicas configured for a specific primary + */ + private static List findReplicasForPrimary(String primaryName, Properties properties) { + List replicas = new ArrayList<>(); + + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.contains(READWRITE_PREFIX + ROLE_SUFFIX)) { + String role = properties.getProperty(propertyName); + if (ROLE_REPLICA.equals(role)) { + // Extract datasource name + String datasourceName = propertyName.substring(0, propertyName.indexOf(READWRITE_PREFIX)); + + // Check if this replica references our primary + String referencedPrimary = getStringProperty(properties, + datasourceName + READWRITE_PREFIX + PRIMARY_SUFFIX, ""); + + if (primaryName.equals(referencedPrimary)) { + replicas.add(datasourceName); + } + } + } + } + + return replicas; + } + + /** + * Validates that all replica references point to valid primaries + */ + public static void validateReplicaReferences(Properties properties) { + Set primaries = findPrimaries(properties); + + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.contains(READWRITE_PREFIX + ROLE_SUFFIX)) { + String role = properties.getProperty(propertyName); + if (ROLE_REPLICA.equals(role)) { + String datasourceName = propertyName.substring(0, propertyName.indexOf(READWRITE_PREFIX)); + String referencedPrimary = getStringProperty(properties, + datasourceName + READWRITE_PREFIX + PRIMARY_SUFFIX, ""); + + if (referencedPrimary.isEmpty()) { + throw new IllegalArgumentException( + "Replica '" + datasourceName + "' does not specify a primary datasource"); + } + + if (!primaries.contains(referencedPrimary)) { + throw new IllegalArgumentException( + "Replica '" + datasourceName + "' references unknown primary '" + referencedPrimary + "'"); + } + } + } + } + } + + /** + * Clears the configuration cache. Useful for testing. + */ + public static void clearCache() { + CONFIG_CACHE.clear(); + } + + // Property parsing helpers + + private static String getStringProperty(Properties properties, String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } + + private static boolean getBooleanProperty(Properties properties, String key, boolean defaultValue) { + String value = properties.getProperty(key); + if (value == null) { + return defaultValue; + } + return Boolean.parseBoolean(value); + } + + private static int getIntProperty(Properties properties, String key, int defaultValue) { + String value = properties.getProperty(key); + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + log.warn("Invalid integer value '{}' for property '{}', using default {}", value, key, defaultValue); + return defaultValue; + } + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManager.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManager.java new file mode 100644 index 000000000..a77c02343 --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManager.java @@ -0,0 +1,240 @@ +package org.openjproxy.grpc.server.readwrite; + +import com.openjproxy.grpc.ConnectionDetails; +import com.openjproxy.grpc.PropertyEntry; +import lombok.extern.slf4j.Slf4j; +import org.openjproxy.datasource.ConnectionPoolProviderRegistry; +import org.openjproxy.datasource.PoolConfig; + +import javax.sql.DataSource; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; + +/** + * Manages creation and registration of primary and replica datasources for read/write splitting. + * This class handles parsing read/write configuration and creating appropriate datasource pools. + */ +@Slf4j +public class ReadWriteDataSourceManager { + + private final ReadWriteDataSourceRegistry registry; + + public ReadWriteDataSourceManager(ReadWriteDataSourceRegistry registry) { + this.registry = Objects.requireNonNull(registry, "registry cannot be null"); + } + + /** + * Checks if read/write splitting is configured for the given connection details. + * + * @param connectionDetails connection details with properties + * @param datasourceName name of the datasource + * @return true if read/write splitting is configured and enabled + */ + public boolean isReadWriteSplittingEnabled(ConnectionDetails connectionDetails, String datasourceName) { + if (connectionDetails.getPropertiesCount() == 0) { + return false; + } + + ReadWriteConfiguration config = parseReadWriteConfig(connectionDetails, datasourceName).config; + + return config != null && config.isEnabled() && !config.getReplicaNames().isEmpty(); + } + + /** + * Creates primary and replica datasources based on read/write configuration. + * Registers them in the ReadWriteDataSourceRegistry. + * + * @param connectionDetails primary connection details + * @param primaryConnHash connection hash for the primary + * @param primaryDs the primary datasource (already created) + * @param datasourceName name of the datasource + * @return ReadWriteConfiguration if read/write splitting was configured, null otherwise + */ + public ReadWriteConfiguration setupReadWriteSplitting( + ConnectionDetails connectionDetails, + String primaryConnHash, + DataSource primaryDs, + String datasourceName) { + + // Idempotency: if the primary is already mapped and has replicas registered, nothing to do. + if (registry.getPrimaryName(primaryConnHash) != null && registry.hasReplicas(datasourceName)) { + log.debug("Read/write splitting already active for primary '{}' (connHash={}), skipping re-setup", + datasourceName, primaryConnHash); + return null; + } + + if (connectionDetails.getPropertiesCount() == 0) { + log.debug("No properties provided, read/write splitting not configured"); + return null; + } + + // Parse configuration (converts gRPC properties to Java Properties once) + ParsedConfig parsed = parseReadWriteConfig(connectionDetails, datasourceName); + ReadWriteConfiguration config = parsed.config; + Properties props = parsed.props; + + if (config == null || !config.isEnabled()) { + log.debug("Read/write splitting not enabled for datasource '{}'", datasourceName); + return null; + } + + if (config.getReplicaNames().isEmpty()) { + log.warn("Read/write splitting enabled for '{}' but no replicas configured", datasourceName); + return null; + } + + log.info("Setting up read/write splitting for primary '{}' with {} replicas", + datasourceName, config.getReplicaNames().size()); + + // Register primary datasource + registry.registerPrimaryMapping(primaryConnHash, datasourceName); + + // Register sticky session timeout + registry.registerStickyTimeout(datasourceName, config.getStickySessionSeconds()); + + // Register replica selection strategy + registry.registerStrategy(datasourceName, config.getStrategy()); + + // Create and register replica datasources + List successfulReplicas = new ArrayList<>(); + for (String replicaName : config.getReplicaNames()) { + try { + DataSource replicaDs = createReplicaDataSource(replicaName, props); + if (replicaDs != null) { + registry.registerReplica(datasourceName, replicaDs); + successfulReplicas.add(replicaName); + log.info("Successfully created and registered replica datasource '{}'", replicaName); + } + } catch (Exception e) { + log.error("Failed to create replica datasource '{}': {}", replicaName, e.getMessage(), e); + // Continue with other replicas + } + } + + if (successfulReplicas.isEmpty()) { + log.warn("No replicas successfully created for primary '{}', read/write splitting will not be active", + datasourceName); + return null; + } + + log.info("Read/write splitting configured for primary '{}' with {} active replicas: {}", + datasourceName, successfulReplicas.size(), successfulReplicas); + + return config; + } + + /** + * Creates a replica datasource based on configuration properties. + * + * @param replicaName name of the replica datasource + * @param props configuration properties + * @return DataSource for the replica, or null if configuration is invalid + */ + private DataSource createReplicaDataSource(String replicaName, Properties props) { + // Extract replica-specific properties with ojp. prefix + String replicaPrefix = replicaName + ".ojp."; + + // Get replica URL (required) + String replicaUrl = props.getProperty(replicaPrefix + "connection.url"); + if (replicaUrl == null || replicaUrl.trim().isEmpty()) { + log.error("No connection URL configured for replica '{}' (looked for {}connection.url), skipping", + replicaName, replicaPrefix); + return null; + } + + // Get replica credentials (optional) + String replicaUser = props.getProperty(replicaPrefix + "connection.user", ""); + String replicaPassword = props.getProperty(replicaPrefix + "connection.password", ""); + + // Get pool configuration (with sensible defaults) + int maxPoolSize = getIntProperty(props, replicaPrefix + "pool.maxPoolSize", 10); + int minIdle = getIntProperty(props, replicaPrefix + "pool.minIdle", 2); + long connectionTimeout = getLongProperty(props, replicaPrefix + "pool.connectionTimeout", 30000); + long idleTimeout = getLongProperty(props, replicaPrefix + "pool.idleTimeout", 600000); + long maxLifetime = getLongProperty(props, replicaPrefix + "pool.maxLifetime", 1800000); + + try { + PoolConfig poolConfig = PoolConfig.builder() + .url(replicaUrl) + .username(replicaUser) + .password(replicaPassword) + .maxPoolSize(maxPoolSize) + .minIdle(minIdle) + .connectionTimeoutMs(connectionTimeout) + .idleTimeoutMs(idleTimeout) + .maxLifetimeMs(maxLifetime) + .defaultTransactionIsolation(java.sql.Connection.TRANSACTION_READ_COMMITTED) + .metricsPrefix("OJP-Replica-" + replicaName) + .build(); + + DataSource ds = ConnectionPoolProviderRegistry.createDataSource(poolConfig); + log.info("Created replica datasource '{}' with URL: {}, maxPoolSize: {}, minIdle: {}", + replicaName, replicaUrl, maxPoolSize, minIdle); + + return ds; + } catch (Exception e) { + log.error("Failed to create datasource for replica '{}': {}", replicaName, e.getMessage(), e); + return null; + } + } + + /** + * Converts gRPC PropertyEntry list to Java Properties object and parses the read/write configuration. + * Centralizes the conversion so it is performed exactly once per public API call. + */ + private ParsedConfig parseReadWriteConfig(ConnectionDetails connectionDetails, String datasourceName) { + Properties props = convertPropertiesToJava(connectionDetails.getPropertiesList()); + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary(datasourceName, props); + return new ParsedConfig(config, props); + } + + /** Holder for a parsed {@link ReadWriteConfiguration} together with the already-converted properties. */ + private static final class ParsedConfig { + private final ReadWriteConfiguration config; + private final Properties props; + + ParsedConfig(ReadWriteConfiguration config, Properties props) { + this.config = config; + this.props = props; + } + } + + /** + * Converts gRPC PropertyEntry list to Java Properties object. + */ + private Properties convertPropertiesToJava(List propertyEntries) { + Properties props = new Properties(); + for (PropertyEntry entry : propertyEntries) { + props.setProperty(entry.getKey(), entry.getStringValue()); + } + return props; + } + + private int getIntProperty(Properties props, String key, int defaultValue) { + String value = props.getProperty(key); + if (value == null || value.trim().isEmpty()) { + return defaultValue; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + log.warn("Invalid integer value for property '{}': {}, using default: {}", key, value, defaultValue); + return defaultValue; + } + } + + private long getLongProperty(Properties props, String key, long defaultValue) { + String value = props.getProperty(key); + if (value == null || value.trim().isEmpty()) { + return defaultValue; + } + try { + return Long.parseLong(value.trim()); + } catch (NumberFormatException e) { + log.warn("Invalid long value for property '{}': {}, using default: {}", key, value, defaultValue); + return defaultValue; + } + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistry.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistry.java new file mode 100644 index 000000000..f200b0eb0 --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistry.java @@ -0,0 +1,251 @@ +package org.openjproxy.grpc.server.readwrite; + +import lombok.extern.slf4j.Slf4j; + +import javax.sql.DataSource; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for managing primary and replica datasource mappings in read/write splitting. + * + *

This class maintains the mapping between primary datasources and their associated + * read replicas. It provides thread-safe operations for registering datasources and + * retrieving replica lists for routing decisions. + * + *

Thread Safety: All operations are thread-safe using ConcurrentHashMap. + */ +@Slf4j +public class ReadWriteDataSourceRegistry { + + // Map of primary datasource name → replica list + private final Map> replicaMap = new ConcurrentHashMap<>(); + + // Map of connection hash → primary datasource name (for session affinity) + private final Map primaryMappings = new ConcurrentHashMap<>(); + + // Map of primary datasource name → sticky session timeout in seconds (0 = disabled) + private final Map stickyTimeoutMap = new ConcurrentHashMap<>(); + + // Map of primary datasource name → replica selection strategy + private final Map strategyMap = new ConcurrentHashMap<>(); + + // Map of primary datasource name → timestamp (epoch ms) of the last write (for sticky sessions) + private final Map lastWriteTimestamps = new ConcurrentHashMap<>(); + + /** + * Registers a mapping from connection hash to primary datasource name. + * This is used to associate client connections with their primary datasource. + * + * @param connectionHash the unique connection identifier + * @param primaryName the primary datasource name + */ + public void registerPrimaryMapping(String connectionHash, String primaryName) { + if (connectionHash == null || primaryName == null) { + throw new IllegalArgumentException("connectionHash and primaryName must not be null"); + } + primaryMappings.put(connectionHash, primaryName); + log.debug("Registered primary mapping: {} -> {}", connectionHash, primaryName); + } + + /** + * Registers a replica datasource for a given primary. + * Multiple replicas can be registered for the same primary. + * + * @param primaryName the primary datasource name + * @param replicaDataSource the replica datasource to register + */ + public void registerReplica(String primaryName, DataSource replicaDataSource) { + if (primaryName == null || replicaDataSource == null) { + throw new IllegalArgumentException("primaryName and replicaDataSource must not be null"); + } + replicaMap.computeIfAbsent(primaryName, k -> new ArrayList<>()) + .add(replicaDataSource); + log.debug("Registered replica for primary: {}", primaryName); + } + + /** + * Gets the list of replica datasources for a given primary. + * + * @param primaryName the primary datasource name + * @return unmodifiable list of replica datasources, empty list if no replicas registered + */ + public List getReplicas(String primaryName) { + List replicas = replicaMap.get(primaryName); + if (replicas == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(replicas); + } + + /** + * Gets the primary datasource name for a given connection hash. + * + * @param connectionHash the connection hash + * @return the primary datasource name, or null if not found + */ + public String getPrimaryName(String connectionHash) { + return primaryMappings.get(connectionHash); + } + + /** + * Checks if a primary datasource has any replicas registered. + * + * @param primaryName the primary datasource name + * @return true if at least one replica is registered, false otherwise + */ + public boolean hasReplicas(String primaryName) { + List replicas = replicaMap.get(primaryName); + return replicas != null && !replicas.isEmpty(); + } + + /** + * Gets the count of replicas for a given primary. + * + * @param primaryName the primary datasource name + * @return the number of replicas, 0 if none registered + */ + public int getReplicaCount(String primaryName) { + List replicas = replicaMap.get(primaryName); + return replicas != null ? replicas.size() : 0; + } + + /** + * Removes all registered replicas for a given primary. + * + * @param primaryName the primary datasource name + */ + public void clearReplicas(String primaryName) { + replicaMap.remove(primaryName); + log.debug("Cleared all replicas for primary: {}", primaryName); + } + + /** + * Removes a primary mapping for a connection hash. + * + * @param connectionHash the connection hash + */ + public void removePrimaryMapping(String connectionHash) { + primaryMappings.remove(connectionHash); + log.debug("Removed primary mapping for connection: {}", connectionHash); + } + + /** + * Registers the sticky session timeout (in seconds) for a given primary datasource. + * A value of 0 disables sticky session behaviour. + * + * @param primaryName the primary datasource name + * @param stickyTimeoutSeconds the duration in seconds to route reads to primary after a write + */ + public void registerStickyTimeout(String primaryName, int stickyTimeoutSeconds) { + if (primaryName == null) { + throw new IllegalArgumentException("primaryName must not be null"); + } + stickyTimeoutMap.put(primaryName, stickyTimeoutSeconds); + log.debug("Registered sticky session timeout for primary '{}': {}s", primaryName, stickyTimeoutSeconds); + } + + /** + * Returns the sticky session timeout in seconds for the given primary datasource. + * Returns 0 if no timeout has been registered (sticky sessions disabled). + * + * @param primaryName the primary datasource name + * @return the sticky session timeout in seconds, or 0 if not configured + */ + public int getStickySessionSeconds(String primaryName) { + return stickyTimeoutMap.getOrDefault(primaryName, 0); + } + + /** + * Registers the replica selection strategy for a given primary datasource. + * + * @param primaryName the primary datasource name + * @param strategy the strategy to use when selecting a replica + */ + public void registerStrategy(String primaryName, + ReadWriteConfiguration.ReplicaSelectionStrategy strategy) { + if (primaryName == null || strategy == null) { + throw new IllegalArgumentException("primaryName and strategy must not be null"); + } + strategyMap.put(primaryName, strategy); + log.debug("Registered replica selection strategy for primary '{}': {}", primaryName, strategy); + } + + /** + * Returns the replica selection strategy for the given primary datasource. + * Defaults to {@link ReadWriteConfiguration.ReplicaSelectionStrategy#ROUND_ROBIN} + * if none has been registered. + * + * @param primaryName the primary datasource name + * @return the configured strategy, or {@code ROUND_ROBIN} if not set + */ + public ReadWriteConfiguration.ReplicaSelectionStrategy getStrategy(String primaryName) { + return strategyMap.getOrDefault(primaryName, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + } + + /** + * Records that a write operation has just been performed on the given primary. + * This timestamp is used to determine whether the sticky-session window is still active. + * + * @param primaryName the primary datasource name + */ + public void markWrite(String primaryName) { + if (primaryName == null) { + return; + } + lastWriteTimestamps.put(primaryName, System.currentTimeMillis()); + log.debug("[RW-SPLIT] markWrite: sticky session started for primary='{}', timeout={}s", + primaryName, stickyTimeoutMap.getOrDefault(primaryName, 0)); + } + + /** + * Returns {@code true} when a sticky-session window is currently active for the + * given primary. A sticky window is active when all of the following hold: + *

    + *
  1. A sticky timeout greater than zero has been registered.
  2. + *
  3. A write has occurred within the last {@code stickyTimeoutSeconds} seconds.
  4. + *
+ * + * @param primaryName the primary datasource name + * @return {@code true} if reads should be routed to the primary (sticky), {@code false} + * if they may be routed to a replica + */ + public boolean isStickyActive(String primaryName) { + if (primaryName == null) { + return false; + } + int timeoutSeconds = stickyTimeoutMap.getOrDefault(primaryName, 0); + if (timeoutSeconds <= 0) { + log.debug("[RW-SPLIT] isStickyActive: primary='{}', timeoutSeconds={} (disabled), sticky=false", + primaryName, timeoutSeconds); + return false; + } + Long lastWrite = lastWriteTimestamps.get(primaryName); + if (lastWrite == null) { + log.debug("[RW-SPLIT] isStickyActive: primary='{}', no write timestamp recorded, sticky=false", primaryName); + return false; + } + long elapsed = System.currentTimeMillis() - lastWrite; + boolean active = elapsed < (long) timeoutSeconds * 1000; + log.debug("[RW-SPLIT] isStickyActive: primary='{}', elapsedMs={}, timeoutMs={}, sticky={}", + primaryName, elapsed, (long) timeoutSeconds * 1000, active); + return active; + } + + /** + * Clears all registered datasources and mappings. + * This is primarily for testing and cleanup purposes. + */ + public void clear() { + replicaMap.clear(); + primaryMappings.clear(); + stickyTimeoutMap.clear(); + strategyMap.clear(); + lastWriteTimestamps.clear(); + log.debug("Cleared all registry data"); + } +} diff --git a/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifier.java b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifier.java new file mode 100644 index 000000000..fad0da8db --- /dev/null +++ b/ojp-server/src/main/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifier.java @@ -0,0 +1,65 @@ +package org.openjproxy.grpc.server.readwrite; + +/** + * Classifies SQL statements as read-only or write operations to support + * read/write traffic splitting. + * + *

SELECT, WITH (CTEs), EXPLAIN, SHOW, and DESCRIBE statements are treated + * as read-only and may be routed to a replica. All other statements + * (INSERT, UPDATE, DELETE, MERGE, DDL, etc.) are treated as writes and are + * always routed to the primary. + */ +public final class ReadWriteSqlClassifier { + + /** Indicates whether a SQL statement modifies data. */ + public enum QueryType { + READ, + WRITE + } + + private ReadWriteSqlClassifier() { + throw new UnsupportedOperationException("Utility class"); + } + + /** + * Classifies the given SQL statement as {@link QueryType#READ} or + * {@link QueryType#WRITE}. + * + * @param sql the SQL statement to classify (may be {@code null}) + * @return {@link QueryType#READ} for read-only statements, + * {@link QueryType#WRITE} for all other statements and for + * {@code null} / blank input + */ + public static QueryType classify(String sql) { + if (sql == null || sql.isBlank()) { + return QueryType.WRITE; + } + String upper = sql.stripLeading().toUpperCase(); + if (isKeyword(upper, "SELECT") + || isKeyword(upper, "WITH") + || isKeyword(upper, "EXPLAIN") + || isKeyword(upper, "SHOW") + || isKeyword(upper, "DESCRIBE") + || isKeyword(upper, "DESC")) { + return QueryType.READ; + } + return QueryType.WRITE; + } + + /** + * Returns {@code true} when {@code upper} starts with {@code keyword} and the + * character immediately following the keyword (if any) is not an alphanumeric + * character. This ensures that e.g. {@code "DESCusers"} is not treated as a + * {@code DESC} statement while {@code "DESC users"}, {@code "DESC\nusers"}, and + * {@code "DESCRIBE users"} are all recognised correctly. + */ + private static boolean isKeyword(String upper, String keyword) { + if (!upper.startsWith(keyword)) { + return false; + } + if (upper.length() == keyword.length()) { + return true; + } + return !Character.isLetterOrDigit(upper.charAt(keyword.length())); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/ConnectActionConcurrencyTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/ConnectActionConcurrencyTest.java index 51deebaa4..a9f88bcb9 100644 --- a/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/ConnectActionConcurrencyTest.java +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/ConnectActionConcurrencyTest.java @@ -11,6 +11,7 @@ import org.openjproxy.grpc.server.SessionManager; import org.openjproxy.grpc.server.UnpooledConnectionDetails; import org.openjproxy.grpc.server.action.ActionContext; +import org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry; import org.openjproxy.xa.pool.spi.XAConnectionPoolProvider; import javax.sql.DataSource; @@ -60,6 +61,7 @@ void testConcurrentConnectionsCreateOnlyOnePool() throws InterruptedException { new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), + new ReadWriteDataSourceRegistry(), mock(XAConnectionPoolProvider.class), new MultinodeXaCoordinator(), new ClusterHealthTracker(), @@ -138,6 +140,7 @@ void testConcurrentConnectionsDifferentHashesCreateSeparatePools() { new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), + new ReadWriteDataSourceRegistry(), mock(XAConnectionPoolProvider.class), new MultinodeXaCoordinator(), new ClusterHealthTracker(), diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/HandleUnpooledXAConnectionActionTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/HandleUnpooledXAConnectionActionTest.java index cb4eca5f0..d564822ce 100644 --- a/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/HandleUnpooledXAConnectionActionTest.java +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/action/connection/HandleUnpooledXAConnectionActionTest.java @@ -12,6 +12,7 @@ import org.openjproxy.grpc.server.SessionManager; import org.openjproxy.grpc.server.SlowQuerySegregationManager; import org.openjproxy.grpc.server.action.ActionContext; +import org.openjproxy.grpc.server.readwrite.ReadWriteDataSourceRegistry; import org.openjproxy.grpc.server.xa.XADataSourceFactory; import org.openjproxy.xa.pool.spi.XAConnectionPoolProvider; @@ -47,6 +48,7 @@ void testExecuteCreatesSlowQuerySegregationManager() { new ConcurrentHashMap<>(), slowQueryManagers, new ConcurrentHashMap<>(), + new ReadWriteDataSourceRegistry(), mock(XAConnectionPoolProvider.class), new MultinodeXaCoordinator(), new ClusterHealthTracker(), diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelectorTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelectorTest.java new file mode 100644 index 000000000..8cd62db50 --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadReplicaSelectorTest.java @@ -0,0 +1,101 @@ +package org.openjproxy.grpc.server.readwrite; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.sql.DataSource; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +/** + * Tests for ReadReplicaSelector + */ +class ReadReplicaSelectorTest { + + private ReadReplicaSelector selector; + private DataSource ds1; + private DataSource ds2; + private DataSource ds3; + + @BeforeEach + void setUp() { + selector = new ReadReplicaSelector(); + ds1 = mock(DataSource.class); + ds2 = mock(DataSource.class); + ds3 = mock(DataSource.class); + } + + @Test + void shouldReturnNullWhenNoReplicasAvailable() { + DataSource result = selector.select("primary", List.of(), + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + assertNull(result); + } + + @Test + void shouldReturnOnlyReplicaForRoundRobin() { + DataSource result = selector.select("primary", List.of(ds1), + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + assertEquals(ds1, result); + } + + @Test + void shouldCycleThroughReplicasWithRoundRobin() { + List replicas = List.of(ds1, ds2, ds3); + + DataSource first = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + DataSource second = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + DataSource third = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + DataSource fourth = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + + // Should cycle: ds1 → ds2 → ds3 → ds1 + assertEquals(ds1, first); + assertEquals(ds2, second); + assertEquals(ds3, third); + assertEquals(ds1, fourth); + } + + @Test + void shouldReturnReplicaForRandomStrategy() { + List replicas = List.of(ds1, ds2); + DataSource result = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM); + assertTrue(replicas.contains(result), "Random selection must return one of the registered replicas"); + } + + @Test + void shouldUseRoundRobinFallbackForLeastConnections() { + List replicas = List.of(ds1, ds2); + + DataSource first = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.LEAST_CONNECTIONS); + DataSource second = selector.select("primary", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.LEAST_CONNECTIONS); + + // LEAST_CONNECTIONS falls back to round-robin in Phase 2 + assertNotNull(first); + assertNotNull(second); + assertTrue(replicas.contains(first)); + assertTrue(replicas.contains(second)); + } + + @Test + void shouldMaintainSeparateCountersPerPrimary() { + List replicas = List.of(ds1, ds2); + + DataSource primary1First = selector.select("primary1", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + DataSource primary2First = selector.select("primary2", replicas, + ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN); + + // Each primary has its own counter starting at 0 → both pick ds1 + assertEquals(ds1, primary1First); + assertEquals(ds1, primary2First); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParserTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParserTest.java new file mode 100644 index 000000000..b2df0bbb4 --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationParserTest.java @@ -0,0 +1,291 @@ +package org.openjproxy.grpc.server.readwrite; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for ReadWriteConfigurationParser class + */ +class ReadWriteConfigurationParserTest { + + @BeforeEach + void setUp() { + ReadWriteConfigurationParser.clearCache(); + } + + @AfterEach + void tearDown() { + ReadWriteConfigurationParser.clearCache(); + } + + @Test + void testParseBasicConfiguration() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.enabled", "true"); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.replicaSelectionStrategy", "ROUND_ROBIN"); + props.setProperty("primary.ojp.readwrite.stickySessionSeconds", "5"); + props.setProperty("primary.ojp.readwrite.replicaFailoverToPrimary", "true"); + + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "primary"); + + Map configs = ReadWriteConfigurationParser.parseAll(props); + + assertEquals(1, configs.size()); + assertTrue(configs.containsKey("primary")); + + ReadWriteConfiguration config = configs.get("primary"); + assertEquals("primary", config.getPrimaryName()); + assertTrue(config.isEnabled()); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + assertEquals(5, config.getStickySessionSeconds()); + assertTrue(config.isFailoverToPrimary()); + assertEquals(1, config.getReplicaCount()); + assertTrue(config.getReplicaNames().contains("replica1")); + } + + @Test + void testParseMultipleReplicas() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.enabled", "true"); + props.setProperty("primary.ojp.readwrite.role", "primary"); + + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "primary"); + + props.setProperty("replica2.ojp.readwrite.role", "replica"); + props.setProperty("replica2.ojp.readwrite.primary", "primary"); + + props.setProperty("replica3.ojp.readwrite.role", "replica"); + props.setProperty("replica3.ojp.readwrite.primary", "primary"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertEquals(3, config.getReplicaCount()); + assertTrue(config.getReplicaNames().contains("replica1")); + assertTrue(config.getReplicaNames().contains("replica2")); + assertTrue(config.getReplicaNames().contains("replica3")); + } + + @Test + void testParseMultiplePrimaries() { + Properties props = new Properties(); + + // Primary 1 + props.setProperty("db1.ojp.readwrite.enabled", "true"); + props.setProperty("db1.ojp.readwrite.role", "primary"); + props.setProperty("db1_replica.ojp.readwrite.role", "replica"); + props.setProperty("db1_replica.ojp.readwrite.primary", "db1"); + + // Primary 2 + props.setProperty("db2.ojp.readwrite.enabled", "true"); + props.setProperty("db2.ojp.readwrite.role", "primary"); + props.setProperty("db2_replica.ojp.readwrite.role", "replica"); + props.setProperty("db2_replica.ojp.readwrite.primary", "db2"); + + Map configs = ReadWriteConfigurationParser.parseAll(props); + + assertEquals(2, configs.size()); + assertTrue(configs.containsKey("db1")); + assertTrue(configs.containsKey("db2")); + + assertEquals(1, configs.get("db1").getReplicaCount()); + assertEquals(1, configs.get("db2").getReplicaCount()); + } + + @Test + void testParseDisabledConfiguration() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.enabled", "false"); + props.setProperty("primary.ojp.readwrite.role", "primary"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertFalse(config.isEnabled()); + } + + @Test + void testParseDefaultValues() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertFalse(config.isEnabled()); // Default: disabled + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + assertEquals(0, config.getStickySessionSeconds()); + assertTrue(config.isFailoverToPrimary()); + } + + @Test + void testParseCustomStrategy() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.replicaSelectionStrategy", "RANDOM"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM, config.getStrategy()); + } + + @Test + void testParseInvalidStrategy() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.replicaSelectionStrategy", "INVALID"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + // Should fall back to default + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + } + + @Test + void testParseCaseInsensitiveStrategy() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.replicaSelectionStrategy", "round_robin"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + } + + @Test + void testParseCustomStickySessionSeconds() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.stickySessionSeconds", "10"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertEquals(10, config.getStickySessionSeconds()); + } + + @Test + void testParseInvalidStickySessionSeconds() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.stickySessionSeconds", "invalid"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + // Should use default (0 — disabled) + assertEquals(0, config.getStickySessionSeconds()); + } + + @Test + void testParseFailoverToPrimaryDisabled() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("primary.ojp.readwrite.replicaFailoverToPrimary", "false"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + assertFalse(config.isFailoverToPrimary()); + } + + @Test + void testValidateReplicaReferences_Valid() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "primary"); + + // Should not throw + assertDoesNotThrow(() -> ReadWriteConfigurationParser.validateReplicaReferences(props)); + } + + @Test + void testValidateReplicaReferences_MissingPrimary() { + Properties props = new Properties(); + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "nonexistent"); + + assertThrows(IllegalArgumentException.class, () -> + ReadWriteConfigurationParser.validateReplicaReferences(props)); + } + + @Test + void testValidateReplicaReferences_NoPrimarySpecified() { + Properties props = new Properties(); + props.setProperty("replica1.ojp.readwrite.role", "replica"); + // Missing: replica1.ojp.readwrite.primary + + assertThrows(IllegalArgumentException.class, () -> + ReadWriteConfigurationParser.validateReplicaReferences(props)); + } + + @Test + void testNoConfiguration() { + Properties props = new Properties(); + + Map configs = ReadWriteConfigurationParser.parseAll(props); + + assertTrue(configs.isEmpty()); + } + + @Test + void testReplicaWithoutMatchingPrimary() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "other_primary"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + // Replica doesn't reference this primary + assertEquals(0, config.getReplicaCount()); + } + + @Test + void testCaching() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.enabled", "true"); + props.setProperty("primary.ojp.readwrite.role", "primary"); + props.setProperty("replica1.ojp.readwrite.role", "replica"); + props.setProperty("replica1.ojp.readwrite.primary", "primary"); + + ReadWriteConfiguration config1 = ReadWriteConfigurationParser.parseForPrimary("primary", props); + ReadWriteConfiguration config2 = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + // Should return same instance from cache (only cached when enabled with replicas) + assertSame(config1, config2); + } + + @Test + void testClearCache() { + Properties props = new Properties(); + props.setProperty("primary.ojp.readwrite.role", "primary"); + + ReadWriteConfiguration config1 = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + ReadWriteConfigurationParser.clearCache(); + + ReadWriteConfiguration config2 = ReadWriteConfigurationParser.parseForPrimary("primary", props); + + // Should be different instances after cache clear + assertNotSame(config1, config2); + } + + @Test + void testDatasourceNameWithUnderscore() { + Properties props = new Properties(); + props.setProperty("my_primary_db.ojp.readwrite.role", "primary"); + props.setProperty("my_replica_db.ojp.readwrite.role", "replica"); + props.setProperty("my_replica_db.ojp.readwrite.primary", "my_primary_db"); + + ReadWriteConfiguration config = ReadWriteConfigurationParser.parseForPrimary("my_primary_db", props); + + assertEquals("my_primary_db", config.getPrimaryName()); + assertEquals(1, config.getReplicaCount()); + assertTrue(config.getReplicaNames().contains("my_replica_db")); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationTest.java new file mode 100644 index 000000000..4508d15c4 --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteConfigurationTest.java @@ -0,0 +1,190 @@ +package org.openjproxy.grpc.server.readwrite; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for ReadWriteConfiguration class + */ +class ReadWriteConfigurationTest { + + @Test + void testBasicConfiguration() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .enabled(true) + .strategy(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN) + .stickySessionSeconds(5) + .failoverToPrimary(true) + .addReplica("replica1") + .addReplica("replica2") + .build(); + + assertEquals("primary", config.getPrimaryName()); + assertTrue(config.isEnabled()); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + assertEquals(5, config.getStickySessionSeconds()); + assertTrue(config.isFailoverToPrimary()); + assertEquals(2, config.getReplicaCount()); + assertEquals(Arrays.asList("replica1", "replica2"), config.getReplicaNames()); + assertTrue(config.hasReplicas()); + } + + @Test + void testDisabledConfiguration() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .enabled(false) + .build(); + + assertFalse(config.isEnabled()); + assertFalse(config.hasReplicas()); + assertEquals(0, config.getReplicaCount()); + } + + @Test + void testDifferentStrategies() { + ReadWriteConfiguration roundRobin = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .strategy(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN) + .build(); + + ReadWriteConfiguration random = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .strategy(ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM) + .build(); + + ReadWriteConfiguration leastConn = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .strategy(ReadWriteConfiguration.ReplicaSelectionStrategy.LEAST_CONNECTIONS) + .build(); + + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, roundRobin.getStrategy()); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM, random.getStrategy()); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.LEAST_CONNECTIONS, leastConn.getStrategy()); + } + + @Test + void testReplicaListImmutability() { + List replicas = Arrays.asList("replica1", "replica2"); + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .replicas(replicas) + .build(); + + // Returned list should be immutable + List replicaNames = config.getReplicaNames(); + assertThrows(UnsupportedOperationException.class, () -> replicaNames.add("replica3")); + } + + @Test + void testValidation_NullPrimaryName() { + ReadWriteConfiguration.Builder builder = new ReadWriteConfiguration.Builder().primaryName(null); + assertThrows(NullPointerException.class, builder::build); + } + + @Test + void testValidation_EmptyPrimaryName() { + ReadWriteConfiguration.Builder builder = new ReadWriteConfiguration.Builder().primaryName(""); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void testValidation_EnabledWithoutReplicas() { + ReadWriteConfiguration.Builder builder = new ReadWriteConfiguration.Builder() + .primaryName("primary").enabled(true); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void testValidation_NegativeStickySessionSeconds() { + ReadWriteConfiguration.Builder builder = new ReadWriteConfiguration.Builder() + .primaryName("primary").stickySessionSeconds(-1).addReplica("replica1"); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void testValidation_DuplicateReplicas() { + ReadWriteConfiguration.Builder builder = new ReadWriteConfiguration.Builder() + .primaryName("primary").enabled(true).addReplica("replica1").addReplica("replica1"); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void testValidation_ZeroStickySessionSeconds() { + // Zero is valid (instant expiration) + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .stickySessionSeconds(0) + .addReplica("replica1") + .build(); + + assertEquals(0, config.getStickySessionSeconds()); + } + + @Test + void testDefaultValues() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .addReplica("replica1") + .build(); + + assertFalse(config.isEnabled()); // Default: disabled + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, config.getStrategy()); + assertEquals(5, config.getStickySessionSeconds()); + assertTrue(config.isFailoverToPrimary()); + } + + @Test + void testToString() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("myPrimary") + .enabled(true) + .addReplica("replica1") + .build(); + + String str = config.toString(); + assertTrue(str.contains("myPrimary")); + assertTrue(str.contains("enabled=true")); + } + + @Test + void testMultipleReplicas() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .addReplica("replica1") + .addReplica("replica2") + .addReplica("replica3") + .build(); + + assertEquals(3, config.getReplicaCount()); + assertEquals(Arrays.asList("replica1", "replica2", "replica3"), config.getReplicaNames()); + } + + @Test + void testFailoverToPrimaryDisabled() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .failoverToPrimary(false) + .addReplica("replica1") + .build(); + + assertFalse(config.isFailoverToPrimary()); + } + + @Test + void testCustomStickySessionSeconds() { + ReadWriteConfiguration config = new ReadWriteConfiguration.Builder() + .primaryName("primary") + .stickySessionSeconds(10) + .addReplica("replica1") + .build(); + + assertEquals(10, config.getStickySessionSeconds()); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManagerTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManagerTest.java new file mode 100644 index 000000000..c5111e936 --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceManagerTest.java @@ -0,0 +1,183 @@ +package org.openjproxy.grpc.server.readwrite; + +import com.openjproxy.grpc.ConnectionDetails; +import com.openjproxy.grpc.PropertyEntry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.sql.DataSource; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +/** + * Test cases for ReadWriteDataSourceManager. + */ +class ReadWriteDataSourceManagerTest { + + private ReadWriteDataSourceRegistry registry; + private ReadWriteDataSourceManager manager; + + @BeforeEach + void setUp() { + ReadWriteConfigurationParser.clearCache(); + registry = new ReadWriteDataSourceRegistry(); + registry.clear(); + manager = new ReadWriteDataSourceManager(registry); + } + + @AfterEach + void tearDown() { + if (registry != null) { + registry.clear(); + } + ReadWriteConfigurationParser.clearCache(); + } + + @Test + void testIsReadWriteSplittingEnabled_NoProperties() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .build(); + + assertFalse(manager.isReadWriteSplittingEnabled(details, "testds")); + } + + @Test + void testIsReadWriteSplittingEnabled_NotConfigured() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .addProperties(PropertyEntry.newBuilder() + .setKey("some.other.property") + .setStringValue("value") + .build()) + .build(); + + assertFalse(manager.isReadWriteSplittingEnabled(details, "testds")); + } + + @Test + void testIsReadWriteSplittingEnabled_ConfiguredButDisabled() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.enabled") + .setStringValue("false") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.role") + .setStringValue("primary") + .build()) + .build(); + + assertFalse(manager.isReadWriteSplittingEnabled(details, "testds")); + } + + @Test + void testIsReadWriteSplittingEnabled_EnabledWithoutReplicas() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.enabled") + .setStringValue("true") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.role") + .setStringValue("primary") + .build()) + .build(); + + // Without replicas configured, builder throws IllegalArgumentException + assertThrows(IllegalArgumentException.class, () -> manager.isReadWriteSplittingEnabled(details, "testds")); + } + + @Test + void testIsReadWriteSplittingEnabled_EnabledWithReplicas() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.enabled") + .setStringValue("true") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("testds.ojp.readwrite.role") + .setStringValue("primary") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("replica1.ojp.readwrite.role") + .setStringValue("replica") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("replica1.ojp.readwrite.primary") + .setStringValue("testds") + .build()) + .build(); + + assertTrue(manager.isReadWriteSplittingEnabled(details, "testds")); + } + + @Test + void testSetupReadWriteSplitting_NoConfiguration() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:test") + .build(); + + DataSource ds = mock(DataSource.class); + ReadWriteConfiguration config = manager.setupReadWriteSplitting( + details, "conn123", ds, "testds"); + + assertNull(config); + } + + @Test + void testSetupReadWriteSplitting_ValidConfiguration() { + ConnectionDetails details = ConnectionDetails.newBuilder() + .setUrl("jdbc:h2:mem:mgr_test_primary") + .addProperties(PropertyEntry.newBuilder() + .setKey("mgr_test_primary.ojp.readwrite.enabled") + .setStringValue("true") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("mgr_test_primary.ojp.readwrite.role") + .setStringValue("primary") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("mgr_test_replica.ojp.readwrite.role") + .setStringValue("replica") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("mgr_test_replica.ojp.readwrite.primary") + .setStringValue("mgr_test_primary") + .build()) + .addProperties(PropertyEntry.newBuilder() + .setKey("mgr_test_replica.ojp.connection.url") + .setStringValue("jdbc:h2:mem:mgr_test_replica;DB_CLOSE_DELAY=-1") + .build()) + .build(); + + DataSource ds = mock(DataSource.class); + ReadWriteConfiguration config = manager.setupReadWriteSplitting( + details, "conn123", ds, "mgr_test_primary"); + + assertNotNull(config); + assertTrue(config.isEnabled()); + assertEquals("mgr_test_primary", config.getPrimaryName()); + assertEquals(1, config.getReplicaNames().size()); + assertEquals("mgr_test_replica", config.getReplicaNames().get(0)); + + String primaryName = registry.getPrimaryName("conn123"); + assertNotNull(primaryName); + assertEquals("mgr_test_primary", primaryName); + + List registeredReplicas = registry.getReplicas("mgr_test_primary"); + assertNotNull(registeredReplicas); + assertEquals(1, registeredReplicas.size()); + } + + @Test + void testConstructorNullRegistry() { + assertThrows(NullPointerException.class, () -> new ReadWriteDataSourceManager(null)); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistryTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistryTest.java new file mode 100644 index 000000000..44d2e8e4e --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteDataSourceRegistryTest.java @@ -0,0 +1,108 @@ +package org.openjproxy.grpc.server.readwrite; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.sql.DataSource; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +/** + * Tests for ReadWriteDataSourceRegistry — sticky session and strategy features (Phase 2). + */ +class ReadWriteDataSourceRegistryTest { + + private ReadWriteDataSourceRegistry registry; + + @BeforeEach + void setUp() { + registry = new ReadWriteDataSourceRegistry(); + } + + // ----------------------------------------------------------------------- + // Strategy + // ----------------------------------------------------------------------- + + @Test + void shouldDefaultToRoundRobinStrategyWhenNoneRegistered() { + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, + registry.getStrategy("primary")); + } + + @Test + void shouldReturnRegisteredStrategy() { + registry.registerStrategy("primary", ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM, registry.getStrategy("primary")); + } + + @Test + void shouldThrowWhenRegisteringNullStrategy() { + assertThrows(IllegalArgumentException.class, () -> registry.registerStrategy("primary", null)); + } + + @Test + void shouldThrowWhenRegisteringStrategyForNullPrimary() { + assertThrows(IllegalArgumentException.class, + () -> registry.registerStrategy(null, ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM)); + } + + // ----------------------------------------------------------------------- + // Sticky session — isStickyActive + // ----------------------------------------------------------------------- + + @Test + void shouldNotBeActiveWhenNoWriteHasOccurred() { + registry.registerStickyTimeout("primary", 5); + assertFalse(registry.isStickyActive("primary")); + } + + @Test + void shouldNotBeActiveWhenStickyTimeoutIsZero() { + registry.registerStickyTimeout("primary", 0); + registry.markWrite("primary"); + assertFalse(registry.isStickyActive("primary")); + } + + @Test + void shouldBeActiveImmediatelyAfterWrite() { + registry.registerStickyTimeout("primary", 5); + registry.markWrite("primary"); + assertTrue(registry.isStickyActive("primary")); + } + + @Test + void shouldNotBeActiveForNullPrimary() { + assertFalse(registry.isStickyActive(null)); + } + + @Test + void shouldExpireAfterTimeoutElapses() { + registry.registerStickyTimeout("primary", 0); // 0 = immediately inactive + registry.markWrite("primary"); + assertFalse(registry.isStickyActive("primary"), + "Sticky session with timeout=0 should not be active"); + } + + // ----------------------------------------------------------------------- + // clear() + // ----------------------------------------------------------------------- + + @Test + void shouldClearAllDataIncludingPhase2State() { + DataSource ds = mock(DataSource.class); + registry.registerPrimaryMapping("hash1", "primary"); + registry.registerReplica("primary", ds); + registry.registerStickyTimeout("primary", 10); + registry.registerStrategy("primary", ReadWriteConfiguration.ReplicaSelectionStrategy.RANDOM); + registry.markWrite("primary"); + + registry.clear(); + + assertNull(registry.getPrimaryName("hash1")); + assertTrue(registry.getReplicas("primary").isEmpty()); + assertFalse(registry.isStickyActive("primary")); + assertEquals(ReadWriteConfiguration.ReplicaSelectionStrategy.ROUND_ROBIN, + registry.getStrategy("primary")); + } +} diff --git a/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifierTest.java b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifierTest.java new file mode 100644 index 000000000..0eb0a4df0 --- /dev/null +++ b/ojp-server/src/test/java/org/openjproxy/grpc/server/readwrite/ReadWriteSqlClassifierTest.java @@ -0,0 +1,107 @@ +package org.openjproxy.grpc.server.readwrite; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for ReadWriteSqlClassifier + */ +class ReadWriteSqlClassifierTest { + + @Test + void shouldClassifySelectAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("SELECT * FROM users")); + } + + @Test + void shouldClassifySelectWithLeadingWhitespaceAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify(" SELECT id FROM orders")); + } + + @Test + void shouldClassifyLowercaseSelectAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("select count(*) from items")); + } + + @Test + void shouldClassifyWithClauseAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("WITH cte AS (SELECT 1) SELECT * FROM cte")); + } + + @Test + void shouldClassifyExplainAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("EXPLAIN SELECT * FROM users")); + } + + @Test + void shouldClassifyShowAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("SHOW TABLES")); + } + + @Test + void shouldClassifyDescribeAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("DESCRIBE users")); + } + + @Test + void shouldClassifyDescAbbreviationAsRead() { + assertEquals(ReadWriteSqlClassifier.QueryType.READ, + ReadWriteSqlClassifier.classify("DESC users")); + } + + @Test + void shouldClassifyInsertAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("INSERT INTO users VALUES (1, 'alice')")); + } + + @Test + void shouldClassifyUpdateAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("UPDATE users SET name = 'bob' WHERE id = 1")); + } + + @Test + void shouldClassifyDeleteAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("DELETE FROM users WHERE id = 1")); + } + + @Test + void shouldClassifyCreateTableAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("CREATE TABLE users (id INT)")); + } + + @Test + void shouldClassifyDropTableAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("DROP TABLE users")); + } + + @Test + void shouldClassifyNullAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify(null)); + } + + @Test + void shouldClassifyBlankStringAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify(" ")); + } + + @Test + void shouldClassifyMergeAsWrite() { + assertEquals(ReadWriteSqlClassifier.QueryType.WRITE, + ReadWriteSqlClassifier.classify("MERGE INTO users USING src ON (users.id = src.id)")); + } +} diff --git a/spring-boot-starter-ojp/src/test/java/org/openjproxy/autoconfigure/OjpSystemPropertiesBridgeTest.java b/spring-boot-starter-ojp/src/test/java/org/openjproxy/autoconfigure/OjpSystemPropertiesBridgeTest.java index 76498fc23..332d93ae0 100644 --- a/spring-boot-starter-ojp/src/test/java/org/openjproxy/autoconfigure/OjpSystemPropertiesBridgeTest.java +++ b/spring-boot-starter-ojp/src/test/java/org/openjproxy/autoconfigure/OjpSystemPropertiesBridgeTest.java @@ -220,4 +220,81 @@ void shouldSetBothDefaultAndNamedPoolPropertiesFromSameEnvironment() { System.clearProperty(namedProp); } } + + // ---- read/write splitting properties ------------------------------------ + + @Test + void toSystemPropertyKeyShouldHandleReadWriteSplittingKeys() { + // camelCase property names pass through unchanged + assertEquals("primary.ojp.readwrite.enabled", + OjpSystemPropertiesBridge.toSystemPropertyKey("primary.ojp.readwrite.enabled")); + assertEquals("primary.ojp.readwrite.role", + OjpSystemPropertiesBridge.toSystemPropertyKey("primary.ojp.readwrite.role")); + assertEquals("replica1.ojp.readwrite.role", + OjpSystemPropertiesBridge.toSystemPropertyKey("replica1.ojp.readwrite.role")); + assertEquals("replica1.ojp.readwrite.primary", + OjpSystemPropertiesBridge.toSystemPropertyKey("replica1.ojp.readwrite.primary")); + assertEquals("replica1.ojp.connection.url", + OjpSystemPropertiesBridge.toSystemPropertyKey("replica1.ojp.connection.url")); + } + + @Test + void toSystemPropertyKeyShouldConvertKebabCaseReadWriteSplittingKeys() { + // kebab-case variants (as written in application.properties) are converted to camelCase + assertEquals("primary.ojp.readwrite.replicaSelectionStrategy", + OjpSystemPropertiesBridge.toSystemPropertyKey( + "primary.ojp.readwrite.replica-selection-strategy")); + assertEquals("primary.ojp.readwrite.stickySessionSeconds", + OjpSystemPropertiesBridge.toSystemPropertyKey( + "primary.ojp.readwrite.sticky-session-seconds")); + assertEquals("primary.ojp.readwrite.replicaFailoverToPrimary", + OjpSystemPropertiesBridge.toSystemPropertyKey( + "primary.ojp.readwrite.replica-failover-to-primary")); + } + + @Test + void shouldForwardReadWriteSplittingPropertiesAsSystemProperties() { + String propEnabled = "primary.ojp.readwrite.enabled"; + String propRole = "primary.ojp.readwrite.role"; + String propStrategy = "primary.ojp.readwrite.replicaSelectionStrategy"; + String propSticky = "primary.ojp.readwrite.stickySessionSeconds"; + String propR1Role = "replica1.ojp.readwrite.role"; + String propR1Primary = "replica1.ojp.readwrite.primary"; + String propR1Url = "replica1.ojp.connection.url"; + System.clearProperty(propEnabled); + System.clearProperty(propRole); + System.clearProperty(propStrategy); + System.clearProperty(propSticky); + System.clearProperty(propR1Role); + System.clearProperty(propR1Primary); + System.clearProperty(propR1Url); + try { + MockEnvironment env = new MockEnvironment(); + env.setProperty("primary.ojp.readwrite.enabled", "true"); + env.setProperty("primary.ojp.readwrite.role", "primary"); + env.setProperty("primary.ojp.readwrite.replica-selection-strategy", "ROUND_ROBIN"); + env.setProperty("primary.ojp.readwrite.sticky-session-seconds", "5"); + env.setProperty("replica1.ojp.readwrite.role", "replica"); + env.setProperty("replica1.ojp.readwrite.primary", "primary"); + env.setProperty("replica1.ojp.connection.url", "jdbc:postgresql://replica.host/db"); + + new OjpSystemPropertiesBridge(env).applySystemProperties(); + + assertEquals("true", System.getProperty(propEnabled)); + assertEquals("primary", System.getProperty(propRole)); + assertEquals("ROUND_ROBIN", System.getProperty(propStrategy)); + assertEquals("5", System.getProperty(propSticky)); + assertEquals("replica", System.getProperty(propR1Role)); + assertEquals("primary", System.getProperty(propR1Primary)); + assertEquals("jdbc:postgresql://replica.host/db", System.getProperty(propR1Url)); + } finally { + System.clearProperty(propEnabled); + System.clearProperty(propRole); + System.clearProperty(propStrategy); + System.clearProperty(propSticky); + System.clearProperty(propR1Role); + System.clearProperty(propR1Primary); + System.clearProperty(propR1Url); + } + } }