Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.kakao.actionbase.engine.storage

/**
* Utility for parsing datastore URIs.
*
* Format: datastore://{namespace}/{tableName}
*/
object DatastoreUri {
private const val PREFIX = "datastore://"
private val SAFE_NAME_PATTERN = Regex("^[a-z0-9_]+$")

/**
* Parses a datastore URI and returns namespace and table name.
*
* @param uri The URI to parse (e.g., "datastore://my_namespace/my_table")
* @return Pair of (namespace, tableName)
* @throws IllegalArgumentException if URI format is invalid
*/
fun parse(uri: String): Pair<String, String> {
require(uri.startsWith(PREFIX)) {
"Invalid datastore URI: $uri. Must start with '$PREFIX'"
}
val parts = uri.removePrefix(PREFIX).split("/")
require(parts.size == 2) {
"Invalid datastore URI: $uri. Expected format: datastore://{namespace}/{tableName}"
}
val (namespace, tableName) = parts[0] to parts[1]
require(namespace.isEmpty() || namespace.matches(SAFE_NAME_PATTERN)) {
"Invalid namespace: $namespace. Must contain only lowercase letters, digits, or underscore."
}
require(tableName.matches(SAFE_NAME_PATTERN)) {
"Invalid table name: $tableName. Must contain only lowercase letters, digits, or underscore."
}
return namespace to tableName
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.kakao.actionbase.engine.storage

import reactor.core.publisher.Mono

interface StorageBackend : AutoCloseable {
fun getStorageTable(
namespace: String,
name: String,
): Mono<StorageTable>

fun getStorageTable(uri: String): Mono<StorageTable> {
val (ns, name) = DatastoreUri.parse(uri)
return getStorageTable(ns, name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.kakao.actionbase.engine.storage

import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest

import reactor.core.publisher.Mono

interface StorageTable {
fun get(key: ByteArray): Mono<ByteArray?>

fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>>

fun put(
key: ByteArray,
value: ByteArray,
): Mono<Void>

fun delete(key: ByteArray): Mono<Void>

fun scan(
prefix: ByteArray,
limit: Int,
start: ByteArray?,
stop: ByteArray?,
): Mono<List<HBaseRecord>>

fun increment(
key: ByteArray,
delta: Long,
): Mono<Long>

fun batch(requests: List<MutationRequest>): Mono<Void>

fun exists(key: ByteArray): Mono<Boolean>

fun setIfNotExists(
key: ByteArray,
value: ByteArray,
): Mono<Boolean>

fun deleteIfEquals(
key: ByteArray,
expectedValue: ByteArray,
): Mono<Boolean>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.kakao.actionbase.engine.storage.memory

import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.engine.storage.StorageBackend
import com.kakao.actionbase.engine.storage.StorageTable

import java.util.concurrent.ConcurrentHashMap

import reactor.core.publisher.Mono

class MemoryStorageBackend : StorageBackend {
private val stores = ConcurrentHashMap<String, ByteArrayStore>()

private fun getOrCreateStore(
namespace: String,
name: String,
): ByteArrayStore {
val key = "$namespace:$name"
return stores.computeIfAbsent(key) { ByteArrayStore() }
}

override fun getStorageTable(
namespace: String,
name: String,
): Mono<StorageTable> {
val store = getOrCreateStore(namespace, name)
return Mono.just(MemoryStorageTable(store))
}

override fun close() {
// nothing to close
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.kakao.actionbase.engine.storage.memory

import com.kakao.actionbase.core.storage.HBaseRecord
import com.kakao.actionbase.core.storage.MutationRequest
import com.kakao.actionbase.engine.datastore.impl.ByteArrayStore
import com.kakao.actionbase.engine.storage.StorageTable

import reactor.core.publisher.Mono

class MemoryStorageTable(
private val store: ByteArrayStore,
) : StorageTable {
override fun get(key: ByteArray): Mono<ByteArray?> = Mono.fromCallable { store[key] }

override fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>> =
Mono.fromCallable {
keys.mapNotNull { k -> store[k]?.let { HBaseRecord(key = k, value = it) } }
}

override fun put(
key: ByteArray,
value: ByteArray,
): Mono<Void> = Mono.fromCallable { store[key] = value }.then()

override fun delete(key: ByteArray): Mono<Void> = Mono.fromCallable { store.remove(key) }.then()

override fun scan(
prefix: ByteArray,
limit: Int,
start: ByteArray?,
stop: ByteArray?,
): Mono<List<HBaseRecord>> =
Mono.fromCallable {
store
.prefixScan(prefix)
.filter { record ->
val afterStart = start == null || compareByteArrays(record.key, start) >= 0
val beforeStop = stop == null || compareByteArrays(record.key, stop) < 0
afterStart && beforeStop
}.take(limit)
}

private fun compareByteArrays(
a: ByteArray,
b: ByteArray,
): Int {
val minLen = minOf(a.size, b.size)
for (i in 0 until minLen) {
val cmp = (a[i].toInt() and 0xFF) - (b[i].toInt() and 0xFF)
if (cmp != 0) return cmp
}
return a.size - b.size
}

override fun increment(
key: ByteArray,
delta: Long,
): Mono<Long> = Mono.fromCallable { store.increment(key, delta) }

override fun batch(requests: List<MutationRequest>): Mono<Void> =
Mono
.fromCallable {
requests.forEach {
when (it) {
is MutationRequest.Put -> store[it.key] = it.value
is MutationRequest.Delete -> store.remove(it.key)
is MutationRequest.Increment -> store.increment(it.key, it.value)
}
}
}.then()

override fun exists(key: ByteArray): Mono<Boolean> = Mono.fromCallable { store[key] != null }

override fun setIfNotExists(
key: ByteArray,
value: ByteArray,
): Mono<Boolean> = Mono.fromCallable { store.checkAndSet(key, null, value) }

override fun deleteIfEquals(
key: ByteArray,
expectedValue: ByteArray,
): Mono<Boolean> = Mono.fromCallable { store.checkAndSet(key, expectedValue, null) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.kakao.actionbase.engine.storage

import com.kakao.actionbase.test.documentations.params.ObjectSource
import com.kakao.actionbase.test.documentations.params.ObjectSourceParameterizedTest

import kotlin.test.assertEquals

import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.assertThrows

class DatastoreUriTest {
@Nested
@DisplayName("parse")
inner class ParseTest {
@ObjectSourceParameterizedTest
@ObjectSource(
"""
- uri: datastore://my_namespace/my_table
namespace: my_namespace
table: my_table
- uri: datastore:///my_table
namespace: ""
table: my_table
- uri: datastore://my_namespace_1/my_table_2
namespace: my_namespace_1
table: my_table_2
- uri: datastore://ns/t
namespace: ns
table: t
""",
)
fun `valid URI`(
uri: String,
namespace: String,
table: String,
) {
val (ns, tbl) = DatastoreUri.parse(uri)
assertEquals(namespace, ns)
assertEquals(table, tbl)
}

@ObjectSourceParameterizedTest
@ObjectSource(
"""
# missing or wrong prefix
- uri: ""
error: Must start with
- uri: invalid://ns/table
error: Must start with
- uri: ns/table
error: Must start with

# wrong number of path segments
- uri: datastore://ns
error: Expected format
- uri: datastore://ns/table/extra
error: Expected format

# invalid characters
- uri: datastore://name space/table
error: Invalid namespace
- uri: datastore://ns/table;drop
error: Invalid table name

# uppercase not allowed
- uri: datastore://MyNamespace/table
error: Invalid namespace

# hyphen not allowed
- uri: datastore://ns/my-table
error: Invalid table name
""",
)
fun `invalid URI`(
uri: String,
error: String,
) {
assertThrows<IllegalArgumentException> {
DatastoreUri.parse(uri)
}.also {
assert(it.message!!.contains(error))
}
}
}
}
Loading
Loading