diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/DatastoreUri.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/DatastoreUri.kt new file mode 100644 index 00000000..8bed3c36 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/DatastoreUri.kt @@ -0,0 +1,36 @@ +package com.kakao.actionbase.engine.storage + +/** + * Utility for parsing datastore URIs. + * + * Format: datastore://{namespace}/{tableName} + */ +object DatastoreUri { + private const val PREFIX = "datastore://" + private val SAFE_NAME_PATTERN = Regex("^[a-z0-9_]+$") + + /** + * Parses a datastore URI and returns namespace and table name. + * + * @param uri The URI to parse (e.g., "datastore://my_namespace/my_table") + * @return Pair of (namespace, tableName) + * @throws IllegalArgumentException if URI format is invalid + */ + fun parse(uri: String): Pair { + require(uri.startsWith(PREFIX)) { + "Invalid datastore URI: $uri. Must start with '$PREFIX'" + } + val parts = uri.removePrefix(PREFIX).split("/") + require(parts.size == 2) { + "Invalid datastore URI: $uri. Expected format: datastore://{namespace}/{tableName}" + } + val (namespace, tableName) = parts[0] to parts[1] + require(namespace.isEmpty() || namespace.matches(SAFE_NAME_PATTERN)) { + "Invalid namespace: $namespace. Must contain only lowercase letters, digits, or underscore." + } + require(tableName.matches(SAFE_NAME_PATTERN)) { + "Invalid table name: $tableName. Must contain only lowercase letters, digits, or underscore." + } + return namespace to tableName + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageBackend.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageBackend.kt new file mode 100644 index 00000000..4aee0d01 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageBackend.kt @@ -0,0 +1,15 @@ +package com.kakao.actionbase.engine.storage + +import reactor.core.publisher.Mono + +interface StorageBackend : AutoCloseable { + fun getStorageTable( + namespace: String, + name: String, + ): Mono + + fun getStorageTable(uri: String): Mono { + val (ns, name) = DatastoreUri.parse(uri) + return getStorageTable(ns, name) + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageTable.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageTable.kt new file mode 100644 index 00000000..b1e00f40 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/StorageTable.kt @@ -0,0 +1,45 @@ +package com.kakao.actionbase.engine.storage + +import com.kakao.actionbase.core.storage.HBaseRecord +import com.kakao.actionbase.core.storage.MutationRequest + +import reactor.core.publisher.Mono + +interface StorageTable { + fun get(key: ByteArray): Mono + + fun get(keys: List): Mono> + + fun put( + key: ByteArray, + value: ByteArray, + ): Mono + + fun delete(key: ByteArray): Mono + + fun scan( + prefix: ByteArray, + limit: Int, + start: ByteArray?, + stop: ByteArray?, + ): Mono> + + fun increment( + key: ByteArray, + delta: Long, + ): Mono + + fun batch(requests: List): Mono + + fun exists(key: ByteArray): Mono + + fun setIfNotExists( + key: ByteArray, + value: ByteArray, + ): Mono + + fun deleteIfEquals( + key: ByteArray, + expectedValue: ByteArray, + ): Mono +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageBackend.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageBackend.kt new file mode 100644 index 00000000..61769f30 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageBackend.kt @@ -0,0 +1,33 @@ +package com.kakao.actionbase.engine.storage.memory + +import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore +import com.kakao.actionbase.engine.storage.StorageBackend +import com.kakao.actionbase.engine.storage.StorageTable + +import java.util.concurrent.ConcurrentHashMap + +import reactor.core.publisher.Mono + +class MemoryStorageBackend : StorageBackend { + private val stores = ConcurrentHashMap() + + private fun getOrCreateStore( + namespace: String, + name: String, + ): ByteArrayStore { + val key = "$namespace:$name" + return stores.computeIfAbsent(key) { ByteArrayStore() } + } + + override fun getStorageTable( + namespace: String, + name: String, + ): Mono { + val store = getOrCreateStore(namespace, name) + return Mono.just(MemoryStorageTable(store)) + } + + override fun close() { + // nothing to close + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageTable.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageTable.kt new file mode 100644 index 00000000..0e812ec9 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/storage/memory/MemoryStorageTable.kt @@ -0,0 +1,83 @@ +package com.kakao.actionbase.engine.storage.memory + +import com.kakao.actionbase.core.storage.HBaseRecord +import com.kakao.actionbase.core.storage.MutationRequest +import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore +import com.kakao.actionbase.engine.storage.StorageTable + +import reactor.core.publisher.Mono + +class MemoryStorageTable( + private val store: ByteArrayStore, +) : StorageTable { + override fun get(key: ByteArray): Mono = Mono.fromCallable { store[key] } + + override fun get(keys: List): Mono> = + Mono.fromCallable { + keys.mapNotNull { k -> store[k]?.let { HBaseRecord(key = k, value = it) } } + } + + override fun put( + key: ByteArray, + value: ByteArray, + ): Mono = Mono.fromCallable { store[key] = value }.then() + + override fun delete(key: ByteArray): Mono = Mono.fromCallable { store.remove(key) }.then() + + override fun scan( + prefix: ByteArray, + limit: Int, + start: ByteArray?, + stop: ByteArray?, + ): Mono> = + Mono.fromCallable { + store + .prefixScan(prefix) + .filter { record -> + val afterStart = start == null || compareByteArrays(record.key, start) >= 0 + val beforeStop = stop == null || compareByteArrays(record.key, stop) < 0 + afterStart && beforeStop + }.take(limit) + } + + private fun compareByteArrays( + a: ByteArray, + b: ByteArray, + ): Int { + val minLen = minOf(a.size, b.size) + for (i in 0 until minLen) { + val cmp = (a[i].toInt() and 0xFF) - (b[i].toInt() and 0xFF) + if (cmp != 0) return cmp + } + return a.size - b.size + } + + override fun increment( + key: ByteArray, + delta: Long, + ): Mono = Mono.fromCallable { store.increment(key, delta) } + + override fun batch(requests: List): Mono = + Mono + .fromCallable { + requests.forEach { + when (it) { + is MutationRequest.Put -> store[it.key] = it.value + is MutationRequest.Delete -> store.remove(it.key) + is MutationRequest.Increment -> store.increment(it.key, it.value) + } + } + }.then() + + override fun exists(key: ByteArray): Mono = Mono.fromCallable { store[key] != null } + + override fun setIfNotExists( + key: ByteArray, + value: ByteArray, + ): Mono = Mono.fromCallable { store.checkAndSet(key, null, value) } + + override fun deleteIfEquals( + key: ByteArray, + expectedValue: ByteArray, + ): Mono = Mono.fromCallable { store.checkAndSet(key, expectedValue, null) } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/DatastoreUriTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/DatastoreUriTest.kt new file mode 100644 index 00000000..960227e3 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/DatastoreUriTest.kt @@ -0,0 +1,86 @@ +package com.kakao.actionbase.engine.storage + +import com.kakao.actionbase.test.documentations.params.ObjectSource +import com.kakao.actionbase.test.documentations.params.ObjectSourceParameterizedTest + +import kotlin.test.assertEquals + +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.assertThrows + +class DatastoreUriTest { + @Nested + @DisplayName("parse") + inner class ParseTest { + @ObjectSourceParameterizedTest + @ObjectSource( + """ + - uri: datastore://my_namespace/my_table + namespace: my_namespace + table: my_table + - uri: datastore:///my_table + namespace: "" + table: my_table + - uri: datastore://my_namespace_1/my_table_2 + namespace: my_namespace_1 + table: my_table_2 + - uri: datastore://ns/t + namespace: ns + table: t + """, + ) + fun `valid URI`( + uri: String, + namespace: String, + table: String, + ) { + val (ns, tbl) = DatastoreUri.parse(uri) + assertEquals(namespace, ns) + assertEquals(table, tbl) + } + + @ObjectSourceParameterizedTest + @ObjectSource( + """ + # missing or wrong prefix + - uri: "" + error: Must start with + - uri: invalid://ns/table + error: Must start with + - uri: ns/table + error: Must start with + + # wrong number of path segments + - uri: datastore://ns + error: Expected format + - uri: datastore://ns/table/extra + error: Expected format + + # invalid characters + - uri: datastore://name space/table + error: Invalid namespace + - uri: datastore://ns/table;drop + error: Invalid table name + + # uppercase not allowed + - uri: datastore://MyNamespace/table + error: Invalid namespace + + # hyphen not allowed + - uri: datastore://ns/my-table + error: Invalid table name + """, + ) + fun `invalid URI`( + uri: String, + error: String, + ) { + assertThrows { + DatastoreUri.parse(uri) + }.also { + assert(it.message!!.contains(error)) + } + } + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageBackendTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageBackendTest.kt new file mode 100644 index 00000000..a10ddc80 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageBackendTest.kt @@ -0,0 +1,97 @@ +package com.kakao.actionbase.engine.storage + +import com.kakao.actionbase.engine.storage.memory.MemoryStorageBackend +import com.kakao.actionbase.test.documentations.params.ObjectSource +import com.kakao.actionbase.test.documentations.params.ObjectSourceParameterizedTest + +import kotlin.test.assertNotNull + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test + +class MemoryStorageBackendTest { + private lateinit var backend: MemoryStorageBackend + + @BeforeEach + fun setUp() { + backend = MemoryStorageBackend() + } + + @AfterEach + fun tearDown() { + backend.close() + } + + @Nested + @DisplayName("getStorageTable") + inner class GetStorageTableTest { + @ObjectSourceParameterizedTest + @ObjectSource( + """ + - namespace: test_ns + name: test_table + - namespace: ns1 + name: table1 + - namespace: "" + name: edges + """, + ) + fun `returns StorageTable`( + namespace: String, + name: String, + ) { + val table = backend.getStorageTable(namespace, name).block()!! + assertNotNull(table) + } + + @ObjectSourceParameterizedTest + @ObjectSource( + """ + - uri: datastore://test_ns/test_table + - uri: datastore://ns1/table1 + """, + ) + fun `returns StorageTable with uri`(uri: String) { + val table = backend.getStorageTable(uri).block()!! + assertNotNull(table) + } + + @Test + fun `different tables are isolated from each other`() { + val table1 = backend.getStorageTable("ns1", "table1").block()!! + val table2 = backend.getStorageTable("ns2", "table2").block()!! + val key = "same_key".toByteArray() + + table1.put(key, "v1".toByteArray()).block() + table2.put(key, "v2".toByteArray()).block() + + assert(String(table1.get(key).block()!!) == "v1") { "table1 should have v1" } + assert(String(table2.get(key).block()!!) == "v2") { "table2 should have v2" } + } + + @Test + fun `same namespace and name returns same store`() { + val table1 = backend.getStorageTable("ns", "table").block()!! + val table2 = backend.getStorageTable("ns", "table").block()!! + + table1.put("key".toByteArray(), "value".toByteArray()).block() + + assert(String(table2.get("key".toByteArray()).block()!!) == "value") { + "same namespace+name should share store" + } + } + } + + @Nested + @DisplayName("close") + inner class CloseTest { + @Test + fun `close is idempotent`() { + backend.close() + backend.close() // Should not throw + } + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageTableCompatibilityTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageTableCompatibilityTest.kt new file mode 100644 index 00000000..af86ed46 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/MemoryStorageTableCompatibilityTest.kt @@ -0,0 +1,9 @@ +package com.kakao.actionbase.engine.storage + +import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore +import com.kakao.actionbase.engine.storage.memory.MemoryStorageTable + +/** Memory (ByteArrayStore) compatibility test for StorageTable. */ +class MemoryStorageTableCompatibilityTest : StorageTableCompatibilityTest() { + override fun createTable(): StorageTable = MemoryStorageTable(ByteArrayStore()) +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/StorageTableCompatibilityTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/StorageTableCompatibilityTest.kt new file mode 100644 index 00000000..7c1812e3 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/storage/StorageTableCompatibilityTest.kt @@ -0,0 +1,379 @@ +package com.kakao.actionbase.engine.storage + +import com.kakao.actionbase.core.storage.MutationRequest +import com.kakao.actionbase.test.documentations.params.ObjectSource +import com.kakao.actionbase.test.documentations.params.ObjectSourceParameterizedTest + +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger + +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test + +/** + * Abstract compatibility test for StorageTable implementations. + * + * Required operations: get, scan, put, delete, increment, batch, checkAndMutate. + */ +abstract class StorageTableCompatibilityTest { + protected abstract fun createTable(): StorageTable + + protected open fun supportsCheckAndMutate(): Boolean = true + + protected open fun supportsScanLimit(): Boolean = true + + protected open fun supportsIncrement(): Boolean = true + + private lateinit var table: StorageTable + + @BeforeEach + fun setUp() { + table = createTable() + } + + @Nested + @DisplayName("get") + inner class GetTest { + @ObjectSourceParameterizedTest + @ObjectSource( + """ + - key: key1 + value: value1 + - key: k + value: v + - key: long_key_name + value: long_value + """, + ) + fun `returns stored value`( + key: String, + value: String, + ) { + table.put(b(key), b(value)).block() + assert(String(table.get(b(key)).block()!!) == value) + } + + @Test + fun `returns null when key not exists`() { + assert(table.get(b("missing")).block() == null) + } + + @ObjectSourceParameterizedTest + @ObjectSource( + """ + # all keys exist + - keys: [k1, k2] + values: [v1, v2] + expected: 2 + + # some keys missing + - keys: [exists, missing] + values: [v] + expected: 1 + """, + ) + fun `get all`( + keys: List, + values: List, + expected: Int, + ) { + keys.zip(values).forEach { (k, v) -> table.put(b(k), b(v)).block() } + assert(table.get(keys.map { b(it) }).block()!!.size == expected) + } + } + + @Nested + @DisplayName("scan") + inner class ScanTest { + @BeforeEach + fun setup() { + listOf("user:001:a", "user:001:b", "user:002:a", "post:001").forEach { + table.put(b(it), b("v")).block() + } + } + + @ObjectSourceParameterizedTest + @ObjectSource( + """ + - prefix: "user:001" + expected: 2 + - prefix: "user:" + expected: 3 + - prefix: "post:" + expected: 1 + - prefix: nonexistent + expected: 0 + """, + ) + fun `returns matching prefix`( + prefix: String, + expected: Int, + ) { + val results = table.scan(b(prefix), 100, null, null).block()!! + assert(results.size == expected) { "prefix=$prefix: expected $expected but got ${results.size}" } + } + + @Test + fun `returns sorted keys`() { + val keys = table.scan(b("user:"), 100, null, null).block()!!.map { String(it.key) } + assert(keys == keys.sorted()) + } + + @Test + fun `respects limit`() { + assumeTrue(supportsScanLimit()) + assert(table.scan(b("user:"), 2, null, null).block()!!.size == 2) + } + } + + @Nested + @DisplayName("put") + inner class PutTest { + @Test + fun `stores value`() { + table.put(b("k"), b("v")).block() + assert(table.get(b("k")).block()?.contentEquals(b("v")) == true) + } + + @Test + fun `overwrites existing`() { + table.put(b("k"), b("old")).block() + table.put(b("k"), b("new")).block() + assert(String(table.get(b("k")).block()!!) == "new") + } + } + + @Nested + @DisplayName("delete") + inner class DeleteTest { + @Test + fun `removes key`() { + table.put(b("k"), b("v")).block() + table.delete(b("k")).block() + assert(table.get(b("k")).block() == null) + } + + @Test + fun `silently succeeds for missing key`() { + table.delete(b("nonexistent")).block() + } + } + + @Nested + @DisplayName("increment") + inner class IncrementTest { + @BeforeEach + fun checkSupport() { + assumeTrue(supportsIncrement()) + } + + @ObjectSourceParameterizedTest + @ObjectSource( + """ + # new counter + - initial: 0 + delta: 10 + expected: 10 + + # add to existing + - initial: 100 + delta: 50 + expected: 150 + + # decrement + - initial: 100 + delta: -30 + expected: 70 + """, + ) + fun `increment counter`( + initial: Long, + delta: Long, + expected: Long, + ) { + if (initial != 0L) { + table.put(b("cnt"), longToBytes(initial)).block() + } + assert(table.increment(b("cnt"), delta).block() == expected) + } + } + + @Nested + @DisplayName("batch") + inner class BatchTest { + @Test + fun `executes puts`() { + table.batch(listOf(MutationRequest.Put(b("b1"), b("v1")), MutationRequest.Put(b("b2"), b("v2")))).block() + assert(table.get(listOf(b("b1"), b("b2"))).block()!!.size == 2) + } + + @Test + fun `executes deletes`() { + table.put(b("d1"), b("v")).block() + table.put(b("d2"), b("v")).block() + table.batch(listOf(MutationRequest.Delete(b("d1")), MutationRequest.Delete(b("d2")))).block() + assert(table.get(listOf(b("d1"), b("d2"))).block()!!.isEmpty()) + } + + @Test + fun `executes increments`() { + assumeTrue(supportsIncrement()) + table.batch(listOf(MutationRequest.Increment(b("c1"), 10), MutationRequest.Increment(b("c2"), 20))).block() + assert(bytesToLong(table.get(b("c1")).block()!!) == 10L) + assert(bytesToLong(table.get(b("c2")).block()!!) == 20L) + } + + @Test + fun `executes mixed mutations`() { + assumeTrue(supportsIncrement()) + table.put(b("to-delete"), b("v")).block() + table + .batch( + listOf( + MutationRequest.Put(b("new"), b("v")), + MutationRequest.Delete(b("to-delete")), + MutationRequest.Increment(b("cnt"), 100), + ), + ).block() + assert(table.get(b("new")).block() != null) + assert(table.get(b("to-delete")).block() == null) + assert(bytesToLong(table.get(b("cnt")).block()!!) == 100L) + } + } + + @Nested + @DisplayName("exists") + inner class ExistsTest { + @Test + fun `returns true when key exists`() { + table.put(b("k"), b("v")).block() + assert(table.exists(b("k")).block() == true) + } + + @Test + fun `returns false when key not exists`() { + assert(table.exists(b("missing")).block() == false) + } + } + + @Nested + @DisplayName("checkAndMutate") + inner class CheckAndMutateTest { + @BeforeEach + fun checkSupport() { + assumeTrue(supportsCheckAndMutate()) + } + + @Nested + @DisplayName("setIfNotExists") + inner class SetIfNotExistsTest { + @Test + fun `succeeds when key not exists`() { + assert(table.setIfNotExists(b("lock"), b("owner")).block() == true) + assert(table.get(b("lock")).block()?.contentEquals(b("owner")) == true) + } + + @Test + fun `fails when key exists`() { + table.put(b("lock"), b("existing")).block() + assert(table.setIfNotExists(b("lock"), b("new")).block() == false) + assert(String(table.get(b("lock")).block()!!) == "existing") + } + } + + @Nested + @DisplayName("deleteIfEquals") + inner class DeleteIfEqualsTest { + @Test + fun `succeeds when value matches`() { + table.put(b("lock"), b("owner")).block() + assert(table.deleteIfEquals(b("lock"), b("owner")).block() == true) + assert(table.get(b("lock")).block() == null) + } + + @Test + fun `fails when value differs`() { + table.put(b("lock"), b("owner")).block() + assert(table.deleteIfEquals(b("lock"), b("different")).block() == false) + assert(table.get(b("lock")).block() != null) + } + + @Test + fun `fails when key not exists`() { + assert(table.deleteIfEquals(b("missing"), b("v")).block() == false) + } + } + + @Nested + @DisplayName("concurrent") + inner class ConcurrentTest { + @Test + fun `only one thread acquires lock`() { + val threads = 10 + val acquired = AtomicInteger(0) + val latch = CountDownLatch(threads) + val executor = Executors.newFixedThreadPool(threads) + + repeat(threads) { i -> + executor.submit { + try { + if (table.setIfNotExists(b("lock"), b("owner-$i")).block() == true) { + acquired.incrementAndGet() + } + } finally { + latch.countDown() + } + } + } + + latch.await() + executor.shutdown() + assert(acquired.get() == 1) { "Expected 1 but got ${acquired.get()}" } + } + + @Test + fun `only owner releases lock`() { + table.put(b("lock"), b("owner-0")).block() + val threads = 10 + val released = AtomicInteger(0) + val latch = CountDownLatch(threads) + val executor = Executors.newFixedThreadPool(threads) + + repeat(threads) { i -> + executor.submit { + try { + if (table.deleteIfEquals(b("lock"), b("owner-$i")).block() == true) { + released.incrementAndGet() + } + } finally { + latch.countDown() + } + } + } + + latch.await() + executor.shutdown() + assert(released.get() == 1) { "Expected 1 but got ${released.get()}" } + } + } + } + + companion object { + fun b(s: String): ByteArray = s.toByteArray() + + fun longToBytes(v: Long): ByteArray = + ByteBuffer + .allocate(8) + .order(ByteOrder.BIG_ENDIAN) + .putLong(v) + .array() + + fun bytesToLong(b: ByteArray): Long = ByteBuffer.wrap(b).order(ByteOrder.BIG_ENDIAN).long + } +}