Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.kakao.actionbase.engine.storage

import com.kakao.actionbase.engine.storage.hbase.HBaseStorageBackend
import com.kakao.actionbase.engine.storage.hbase.MockHBaseStorageBackend
import com.kakao.actionbase.engine.storage.memory.MemoryStorageBackend

import org.slf4j.LoggerFactory

/**
* Factory for creating StorageBackend instances.
*
* Thread-safety: This factory is designed to be initialized once at application startup.
* The initialize() method is synchronized to prevent race conditions during initialization.
* Once initialized, the factory cannot be re-initialized.
*
* Usage:
* ```yaml
* hbase:
* type: memory # memory | embedded | hbase (default)
* ```
*/
object DefaultStorageBackendFactory {
private val logger = LoggerFactory.getLogger(DefaultStorageBackendFactory::class.java)

@Volatile
private var instance0: StorageBackend? = null

@Volatile
private var defaultNamespace0: String = "default"

val INSTANCE: StorageBackend
get() = instance0 ?: throw IllegalStateException("StorageBackend not initialized. Call initialize() first.")

val defaultNamespace: String
get() = defaultNamespace0

val isInitialized: Boolean
get() = instance0 != null

/**
* Initializes the storage backend based on the provided properties.
* If already initialized, this method does nothing (idempotent).
*
* @param properties Configuration properties including:
* - type: Backend type (memory, embedded, hbase). Defaults to "hbase".
* - For HBase type, see HBaseStorageBackend.create for additional properties.
*/
@Synchronized
fun initialize(properties: Map<String, String>) {
if (isInitialized) {
logger.debug("StorageBackend already initialized, skipping")
return
}
val type = properties["type"] ?: "hbase"
defaultNamespace0 = properties["namespace"] ?: "default"
logger.info("Initializing StorageBackend with type: {}, namespace: {}", type, defaultNamespace0)

instance0 =
when (type) {
"memory" -> {
logger.info("Using MemoryStorageBackend")
MemoryStorageBackend()
}
"embedded" -> {
logger.info("Using MockHBaseStorageBackend (embedded)")
MockHBaseStorageBackend()
}
else -> {
if (properties.isEmpty() || properties["version"] == "embedded") {
logger.info("🚀 - Using Embedded Mock Storage (legacy)")
MockHBaseStorageBackend()
} else {
logger.info("Using HBaseStorageBackend")
HBaseStorageBackend.create(properties)
}
}
}
}

/**
* Initializes the factory with a pre-created StorageBackend instance.
* If already initialized, this method does nothing (idempotent).
*
* @param backend The StorageBackend instance to use.
* @param namespace The default namespace to use.
*/
@Synchronized
fun initialize(
backend: StorageBackend,
namespace: String = "default",
) {
if (isInitialized) {
logger.debug("StorageBackend already initialized, skipping")
return
}
logger.info("Initializing StorageBackend with provided instance: {}, namespace: {}", backend::class.simpleName, namespace)
instance0 = backend
defaultNamespace0 = namespace
}

@Synchronized
fun close() {
instance0?.close()
instance0 = null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.kakao.actionbase.engine.storage.hbase

import com.kakao.actionbase.engine.storage.StorageBackend
import com.kakao.actionbase.engine.storage.StorageTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseConnections
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTable
import com.kakao.actionbase.v2.engine.storage.hbase.impl.NewMockTable

import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.mock.MockHTable

import reactor.core.publisher.Mono

/**
* Mock HBase storage backend for testing and embedded mode.
* Uses HBase MockHTable for storage operations.
*
* Each namespace + name combination gets its own isolated table.
*/
class MockHBaseStorageBackend : StorageBackend {
override fun getStorageTable(
namespace: String,
name: String,
): Mono<StorageTable> {
val hbaseTable = createMockHBaseTable(namespace, name)
return Mono.just(HBaseStorageTable(hbaseTable))
}

override fun close() {
// nothing to close
}

/**
* Creates a mock HBase table with proper namespace:name isolation.
*/
private fun createMockHBaseTable(
namespace: String,
name: String,
): HBaseTable {
val conn = HBaseConnections.getMockConnection(namespace)
val tableName = if (name.isEmpty()) "edges" else name
val mockTable = conn.getTable(TableName.valueOf(tableName)) as MockHTable
val table = NewMockTable(mockTable)
return HBaseTable.create(table)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.kakao.actionbase.engine.storage

import com.kakao.actionbase.test.hbase.HBaseTestingClusterExtension

import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith

/**
* Tests for DefaultStorageBackendFactory.
*
* Uses HBaseTestingClusterExtension to ensure consistent initialization
* with the HBase testing backend across all tests.
*/
@ExtendWith(HBaseTestingClusterExtension::class)
class DefaultStorageBackendFactoryTest {
@Nested
@DisplayName("initialize")
inner class InitializeTest {
@Test
fun `initialize is idempotent`() {
// Extension already initialized - second call should not throw
DefaultStorageBackendFactory.initialize(mapOf("type" to "memory"))
DefaultStorageBackendFactory.initialize(mapOf("type" to "embedded"))

assert(DefaultStorageBackendFactory.isInitialized)
}
}

@Nested
@DisplayName("isInitialized")
inner class IsInitializedTest {
@Test
fun `returns true after initialization`() {
assert(DefaultStorageBackendFactory.isInitialized)
}
}

@Nested
@DisplayName("close")
inner class CloseTest {
@Test
fun `close is idempotent`() {
DefaultStorageBackendFactory.close()
DefaultStorageBackendFactory.close() // Should not throw
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.kakao.actionbase.test.hbase

import com.kakao.actionbase.engine.storage.DefaultStorageBackendFactory
import com.kakao.actionbase.v2.engine.compat.DefaultHBaseCluster

import org.apache.hadoop.hbase.client.AsyncConnection
Expand Down Expand Up @@ -27,6 +28,9 @@ class HBaseTestingClusterExtension :
override fun beforeAll(context: ExtensionContext) {
HBaseTestingCluster.startIfNeeded()
DefaultHBaseCluster.initialize(Mono.just(HBaseTestingCluster.asyncConnection), "ab_test", HBaseTestingCluster.hbaseConfiguration)
// Initialize DefaultStorageBackendFactory with the HBase testing cluster (idempotent)
val testingBackend = HBaseTestingStorageBackend(Mono.just(HBaseTestingCluster.asyncConnection), "ab_test")
DefaultStorageBackendFactory.initialize(testingBackend, "ab_test")
}

override fun supportsParameter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.kakao.actionbase.test.hbase

import com.kakao.actionbase.engine.storage.StorageBackend
import com.kakao.actionbase.engine.storage.StorageTable
import com.kakao.actionbase.engine.storage.hbase.HBaseStorageTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTable

import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.AsyncConnection

import reactor.core.publisher.Mono

/**
* Storage backend that uses the HBase testing cluster.
* This backend creates tables using the provided AsyncConnection.
*/
class HBaseTestingStorageBackend(
private val connectionMono: Mono<AsyncConnection>,
private val defaultNamespace: String,
) : StorageBackend {
override fun getStorageTable(
namespace: String,
name: String,
): Mono<StorageTable> {
val effectiveNs = namespace.ifEmpty { defaultNamespace }
return connectionMono.map { conn ->
val tableName = TableName.valueOf(effectiveNs, name)
val asyncTable = conn.getTable(tableName)
val hbaseTable = HBaseTable.create(asyncTable)
HBaseStorageTable(hbaseTable)
}
}

override fun close() {
// Connection is managed by HBaseTestingCluster
}
}
Loading