diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/Engine.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/Engine.kt new file mode 100644 index 00000000..ec78fdba --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/Engine.kt @@ -0,0 +1,32 @@ +package com.kakao.actionbase.engine + +import com.kakao.actionbase.engine.catalog.Catalog +import com.kakao.actionbase.engine.catalog.PeriodicCatalog + +import java.time.Duration + +class Engine( + private val catalog: Catalog, +) : AutoCloseable { + init { + catalog.bind(this) + } + + override fun close() { + catalog.close() + } + + companion object { + fun create( + catalogReloadInitialDelay: Duration = Duration.ZERO, + catalogReloadInterval: Duration? = null, + ): Engine { + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay, + catalogReloadInterval, + ) + return Engine(catalog) + } + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/MutationEngine.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/MutationEngine.kt index f6635d2c..d4ff3f1d 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/MutationEngine.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/MutationEngine.kt @@ -2,7 +2,7 @@ package com.kakao.actionbase.engine import com.kakao.actionbase.core.edge.MutationEvent import com.kakao.actionbase.core.state.State -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.metadata.MutationMode import reactor.core.publisher.Mono @@ -13,12 +13,12 @@ import reactor.core.publisher.Mono */ interface MutationEngine { /** - * Resolves a table binding from a database/alias pair. + * Resolves a table from a database/alias pair. */ - fun getTableBinding( + fun getTable( database: String, alias: String, - ): TableBinding + ): Table fun writeWal( ctx: MutationContext, diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/QueryEngine.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/QueryEngine.kt index c1946101..22603c84 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/QueryEngine.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/QueryEngine.kt @@ -1,6 +1,6 @@ package com.kakao.actionbase.engine -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.query.ActionbaseQuery import com.kakao.actionbase.v2.engine.sql.DataFrame @@ -11,10 +11,10 @@ import reactor.core.publisher.Mono * Decouples QueryService from Graph by exposing only the operations it needs. */ interface QueryEngine { - fun getTableBinding( + fun getTable( database: String, alias: String, - ): TableBinding + ): Table fun query(request: ActionbaseQuery): Mono> } diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Alias.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Alias.kt new file mode 100644 index 00000000..6b314f15 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Alias.kt @@ -0,0 +1,10 @@ +package com.kakao.actionbase.engine.catalog + +import com.kakao.actionbase.core.metadata.AliasDescriptor + +interface Alias { + val descriptor: AliasDescriptor + + /** The table this alias resolves to, already materialized. */ + val table: Table +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Catalog.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Catalog.kt new file mode 100644 index 00000000..b1dd085f --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Catalog.kt @@ -0,0 +1,15 @@ +package com.kakao.actionbase.engine.catalog + +import com.kakao.actionbase.core.metadata.DatabaseId +import com.kakao.actionbase.core.metadata.TableId +import com.kakao.actionbase.engine.Engine + +interface Catalog : AutoCloseable { + fun bind(engine: Engine) + + val databases: Map + + val tables: Map + + val aliases: Map +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Database.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Database.kt new file mode 100644 index 00000000..22c61352 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Database.kt @@ -0,0 +1,7 @@ +package com.kakao.actionbase.engine.catalog + +import com.kakao.actionbase.core.metadata.DatabaseDescriptor + +interface Database { + val descriptor: DatabaseDescriptor +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalog.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalog.kt new file mode 100644 index 00000000..e6fa664c --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalog.kt @@ -0,0 +1,121 @@ +package com.kakao.actionbase.engine.catalog + +import com.kakao.actionbase.core.metadata.DatabaseId +import com.kakao.actionbase.core.metadata.TableId +import com.kakao.actionbase.engine.Engine + +import java.time.Duration +import java.time.Instant + +import org.slf4j.LoggerFactory + +import reactor.core.Disposable +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers + +class PeriodicCatalog( + private val catalogReloadInitialDelay: Duration, + private val catalogReloadInterval: Duration?, +) : Catalog { + // --- state --- + @Volatile private var snapshot: Snapshot = Snapshot.EMPTY + + @Volatile private var reloadCount: Long = 0 + + @Volatile private var lastReloadAt: Instant? = null + + @Volatile private var engine: Engine? = null + + private var bound = false + + private var disposable: Disposable? = null + + // --- public views --- + override val databases: Map get() = snapshot.databases + override val tables: Map get() = snapshot.tables + override val aliases: Map get() = snapshot.aliases + + @Synchronized + override fun bind(engine: Engine) { + if (bound) { + log.warn("PeriodicCatalog already bound") + return + } + this.engine = engine + bound = true + + val interval = catalogReloadInterval + if (interval == null) { + log.info( + "catalog periodic reload disabled; one-shot reload after {} ms.", + catalogReloadInitialDelay.toMillis(), + ) + } else { + log.info( + "Starting Flux.interval for reloading catalog every {} ms after {} ms delay.", + interval.toMillis(), + catalogReloadInitialDelay.toMillis(), + ) + } + + val source: Flux = + if (interval == null) { + Mono.delay(catalogReloadInitialDelay).flux() + } else { + Flux.interval(catalogReloadInitialDelay, interval) + } + + disposable = + source + .onBackpressureDrop { log.warn("backpressure drop {}", it) } + .doOnNext { reload() } + .subscribeOn(Schedulers.boundedElastic()) + .onErrorContinue { error, _ -> + log.error( + "Error occurred during catalog reload or unexpected error: {}. Continuing with next interval.", + error.message, + error, + ) + }.subscribe() + } + + @Synchronized + override fun close() { + if (!bound) return + log.info("PeriodicCatalog closing after {} reloads", reloadCount) + disposable?.dispose() + disposable = null + engine = null + } + + fun reloadCount(): Long = reloadCount + + fun lastReloadAt(): Instant? = lastReloadAt + + private fun reload() { + // Guards against an in-flight tick that fires after `close()` + // nulled out `engine`. Phase 2 will read the catalog through + // `engine` here and swap `snapshot` atomically, reusing existing + // Table instances when their descriptors haven't changed. + if (engine == null) return + log.debug("reloading catalog") + // Phase 2: snapshot = Snapshot(freshDatabases, freshTables, freshAliases) + reloadCount++ + lastReloadAt = Instant.now() + } + + private data class Snapshot( + val databases: Map, + val tables: Map, + val aliases: Map, + ) { + companion object { + val EMPTY = Snapshot(emptyMap(), emptyMap(), emptyMap()) + } + } + + companion object { + private val log = LoggerFactory.getLogger(PeriodicCatalog::class.java) + } +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/binding/TableBinding.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Table.kt similarity index 92% rename from engine/src/main/kotlin/com/kakao/actionbase/engine/binding/TableBinding.kt rename to engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Table.kt index 2da302e5..c917d745 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/binding/TableBinding.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/catalog/Table.kt @@ -1,9 +1,10 @@ -package com.kakao.actionbase.engine.binding +package com.kakao.actionbase.engine.catalog import com.kakao.actionbase.core.edge.MutationKey import com.kakao.actionbase.core.edge.payload.DataFrameEdgeAggPayload import com.kakao.actionbase.core.edge.payload.DataFrameEdgeCountPayload import com.kakao.actionbase.core.edge.payload.DataFrameEdgePayload +import com.kakao.actionbase.core.metadata.TableDescriptor import com.kakao.actionbase.core.metadata.common.ModelSchema import com.kakao.actionbase.core.state.State import com.kakao.actionbase.engine.metadata.MutationMode @@ -11,7 +12,8 @@ import com.kakao.actionbase.v2.core.metadata.Direction import reactor.core.publisher.Mono -interface TableBinding { +interface Table { + val descriptor: TableDescriptor<*> val table: String val schema: ModelSchema val mutationMode: MutationMode diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt index e0b69720..ba72123a 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/MutationService.kt @@ -8,7 +8,7 @@ import com.kakao.actionbase.core.state.transit import com.kakao.actionbase.engine.Audit import com.kakao.actionbase.engine.MutationContext import com.kakao.actionbase.engine.MutationEngine -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.context.RequestContext import com.kakao.actionbase.engine.metadata.MutationMode import com.kakao.actionbase.engine.metadata.MutationModeContext @@ -35,7 +35,7 @@ class MutationService( ): Mono> = Mono .fromCallable { - val tb = engine.getTableBinding(database, alias) + val tb = engine.getTable(database, alias) val ctx = MutationContext( database = database, @@ -73,7 +73,7 @@ class MutationService( } private fun readModifyWrite( - tb: TableBinding, + tb: Table, key: MutationKey, sorted: List, acquireLock: Boolean, diff --git a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/QueryService.kt b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/QueryService.kt index d46692b0..d4aa2747 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/engine/service/QueryService.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/engine/service/QueryService.kt @@ -48,7 +48,7 @@ class QueryService( require(filters == null) { "`filters` is not yet supported in count query." } require(features.isEmpty()) { "`features` ${features.joinToString(", ")} are not supported in get query." } - return engine.getTableBinding(database, table).count(start.toSet(), direction) + return engine.getTable(database, table).count(start.toSet(), direction) } @Suppress("UnusedParameter") @@ -69,7 +69,7 @@ class QueryService( target.distinct().map { t -> s to t } } - return engine.getTableBinding(database, table).gets(keys, filters) + return engine.getTable(database, table).gets(keys, filters) } @Suppress("UnusedParameter") @@ -82,14 +82,14 @@ class QueryService( ): Mono { require(features.isEmpty()) { "`features` ${features.joinToString(", ")} are not supported in get query." } - val tb = engine.getTableBinding(database, table) + val t = engine.getTable(database, table) - require(tb.schema is com.kakao.actionbase.core.metadata.common.ModelSchema.MultiEdge) { + require(t.schema is com.kakao.actionbase.core.metadata.common.ModelSchema.MultiEdge) { "get query with ids is only supported for multi-edge tables." } val keys = ids.distinct().map { id -> id to id } - return tb.gets(keys, filters) + return t.gets(keys, filters) } fun scan( @@ -103,7 +103,7 @@ class QueryService( ranges: String? = null, filters: String? = null, features: List = emptyList(), - ): Mono = engine.getTableBinding(database, table).scan(index, start, direction, limit, offset, ranges, filters, features) + ): Mono = engine.getTable(database, table).scan(index, start, direction, limit, offset, ranges, filters, features) fun seek( database: String, @@ -113,7 +113,7 @@ class QueryService( direction: Direction, limit: Int = ScanFilter.defaultLimit, offset: String? = null, - ): Mono = engine.getTableBinding(database, table).seek(cache, start, direction, limit, offset) + ): Mono = engine.getTable(database, table).seek(cache, start, direction, limit, offset) fun agg( database: String, @@ -125,7 +125,7 @@ class QueryService( filters: String? = null, features: List = emptyList(), ttl: Long? = null, - ): Mono = engine.getTableBinding(database, table).agg(group, start, direction, ranges, filters, features, ttl) + ): Mono = engine.getTable(database, table).agg(group, start, direction, ranges, filters, features, ttl) fun query(request: ActionbaseQuery): Mono> = engine.query(request) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/hbase/HBaseIndexedLabel.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/hbase/HBaseIndexedLabel.kt index a45d1564..9ee0c599 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/hbase/HBaseIndexedLabel.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/label/hbase/HBaseIndexedLabel.kt @@ -3,7 +3,7 @@ package com.kakao.actionbase.v2.engine.label.hbase import com.kakao.actionbase.core.edge.Edge import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper import com.kakao.actionbase.core.edge.record.EdgeCacheRecord -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.v2.core.code.EdgeEncoder import com.kakao.actionbase.v2.core.code.IdEdgeEncoder import com.kakao.actionbase.v2.core.code.Index @@ -24,8 +24,8 @@ import com.kakao.actionbase.v2.engine.sql.ScanFilter import com.kakao.actionbase.v2.engine.sql.StatKey import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTables -import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding -import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding.Companion.toV3 +import com.kakao.actionbase.v2.engine.v3.V2BackedTable +import com.kakao.actionbase.v2.engine.v3.V2BackedTable.Companion.toV3 import com.kakao.actionbase.v2.engine.v3.V3TableDescriptor import reactor.core.publisher.Mono @@ -47,9 +47,9 @@ open class HBaseIndexedLabel( tables = tables, ), IndexedLabelMixin { - val tableBinding: TableBinding = - V2BackedTableBinding( - descriptor = V3TableDescriptor.create(entity), + val table: Table = + V2BackedTable( + v3 = V3TableDescriptor.create(entity), label = this, mapper = edgeRecordMapper, lockTimeout = lockTimeout, diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTableBinding.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTable.kt similarity index 75% rename from engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTableBinding.kt rename to engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTable.kt index b20e4470..579a2326 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTableBinding.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/NilTable.kt @@ -4,20 +4,24 @@ import com.kakao.actionbase.core.edge.MutationKey import com.kakao.actionbase.core.edge.payload.DataFrameEdgeAggPayload import com.kakao.actionbase.core.edge.payload.DataFrameEdgeCountPayload import com.kakao.actionbase.core.edge.payload.DataFrameEdgePayload +import com.kakao.actionbase.core.metadata.TableDescriptor import com.kakao.actionbase.core.metadata.common.ModelSchema import com.kakao.actionbase.core.state.State -import com.kakao.actionbase.engine.binding.MutationRecordsSummary -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.MutationRecordsSummary +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.metadata.MutationMode import com.kakao.actionbase.v2.core.metadata.Direction +import com.kakao.actionbase.v2.engine.entity.LabelEntity import reactor.core.publisher.Mono -class NilTableBinding( - descriptor: V3TableDescriptor, -) : TableBinding { - override val table: String = descriptor.table - override val schema: ModelSchema = descriptor.schema +class NilTable( + v3: V3TableDescriptor, + entity: LabelEntity, +) : Table { + override val descriptor: TableDescriptor<*> = v3.toTableDescriptor(entity) + override val table: String = v3.table + override val schema: ModelSchema = v3.schema override val mutationMode: MutationMode = MutationMode.SYNC override fun withLock( @@ -43,7 +47,7 @@ class NilTableBinding( override fun gets( keys: List>, filters: String?, - ): Mono = V2BackedTableBinding.EMPTY_EDGE_PAYLOAD + ): Mono = V2BackedTable.EMPTY_EDGE_PAYLOAD override fun scan( index: String, @@ -54,7 +58,7 @@ class NilTableBinding( ranges: String?, filters: String?, features: List, - ): Mono = V2BackedTableBinding.EMPTY_EDGE_PAYLOAD + ): Mono = V2BackedTable.EMPTY_EDGE_PAYLOAD override fun seek( cache: String, @@ -62,7 +66,7 @@ class NilTableBinding( direction: Direction, limit: Int, offset: String?, - ): Mono = V2BackedTableBinding.EMPTY_EDGE_PAYLOAD + ): Mono = V2BackedTable.EMPTY_EDGE_PAYLOAD override fun agg( group: String, diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedEngine.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedEngine.kt index 4b4c2740..f3f18aa4 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedEngine.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedEngine.kt @@ -7,7 +7,7 @@ import com.kakao.actionbase.core.state.State import com.kakao.actionbase.engine.MutationContext import com.kakao.actionbase.engine.MutationEngine import com.kakao.actionbase.engine.QueryEngine -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.metadata.MutationMode import com.kakao.actionbase.engine.query.ActionbaseQuery import com.kakao.actionbase.v2.engine.Graph @@ -26,20 +26,20 @@ class V2BackedEngine( private val graph: Graph, ) : MutationEngine, QueryEngine { - override fun getTableBinding( + override fun getTable( database: String, alias: String, - ): TableBinding { + ): Table { val label = graph.getLabel(EntityName(database, alias)) if (label is NilLabel) { - return NilTableBinding(V3TableDescriptor.create(label.entity)) + return NilTable(V3TableDescriptor.create(label.entity), label.entity) } if (label !is HBaseIndexedLabel) { throw UnsupportedOperationException( "This Label (${label.entity.fullName}, ${label.javaClass}) is not indexed or not supported for edge mutation", ) } - return label.tableBinding + return label.table } override fun query(request: ActionbaseQuery): Mono> = graph.query(request) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBinding.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTable.kt similarity index 98% rename from engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBinding.kt rename to engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTable.kt index 91c11178..dd0bb29e 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBinding.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTable.kt @@ -18,13 +18,14 @@ import com.kakao.actionbase.core.edge.record.EdgeCacheRecord import com.kakao.actionbase.core.edge.record.EdgeGroupRecord import com.kakao.actionbase.core.edge.record.EdgeStateRecord import com.kakao.actionbase.core.java.codec.common.hbase.Order +import com.kakao.actionbase.core.metadata.TableDescriptor import com.kakao.actionbase.core.metadata.common.Group import com.kakao.actionbase.core.metadata.common.ModelSchema import com.kakao.actionbase.core.state.SpecialStateValue import com.kakao.actionbase.core.state.State import com.kakao.actionbase.core.storage.HBaseRecord -import com.kakao.actionbase.engine.binding.MutationRecordsSummary -import com.kakao.actionbase.engine.binding.TableBinding +import com.kakao.actionbase.engine.catalog.MutationRecordsSummary +import com.kakao.actionbase.engine.catalog.Table import com.kakao.actionbase.engine.metadata.MutationMode import com.kakao.actionbase.v2.core.code.CryptoUtils import com.kakao.actionbase.v2.core.edge.Edge @@ -46,14 +47,15 @@ import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers -class V2BackedTableBinding( - private val descriptor: V3TableDescriptor, +class V2BackedTable( + v3: V3TableDescriptor, private val label: HBaseIndexedLabel, private val mapper: EdgeRecordMapper, private val lockTimeout: Long, -) : TableBinding { - override val table: String = descriptor.table - override val schema: ModelSchema = descriptor.schema +) : Table { + override val descriptor: TableDescriptor<*> = v3.toTableDescriptor(label.entity) + override val table: String = v3.table + override val schema: ModelSchema = v3.schema override val mutationMode: MutationMode = MutationMode.valueOf(label.entity.mode.name) private val groupRecordMapper = mapper.group @@ -609,7 +611,7 @@ class V2BackedTableBinding( } companion object { - private val log = LoggerFactory.getLogger(V2BackedTableBinding::class.java) + private val log = LoggerFactory.getLogger(V2BackedTable::class.java) private const val SELECT_COUNT_FIELD = "COUNT(1)" private const val TS_FIELD = "ts" diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3TableDescriptor.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3TableDescriptor.kt index 4ccce2fa..1d553707 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3TableDescriptor.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3TableDescriptor.kt @@ -1,5 +1,8 @@ package com.kakao.actionbase.v2.engine.v3 +import com.kakao.actionbase.core.metadata.common.MutationMode as V3MutationMode + +import com.kakao.actionbase.core.metadata.TableDescriptor import com.kakao.actionbase.core.metadata.common.Cache import com.kakao.actionbase.core.metadata.common.Field import com.kakao.actionbase.core.metadata.common.Group @@ -15,6 +18,7 @@ import com.kakao.actionbase.v2.core.types.DataType import com.kakao.actionbase.v2.core.types.EdgeSchema import com.kakao.actionbase.v2.core.types.VertexField import com.kakao.actionbase.v2.core.types.VertexType +import com.kakao.actionbase.v2.engine.entity.EntityName import com.kakao.actionbase.v2.engine.entity.LabelEntity sealed class V3TableDescriptor { @@ -22,17 +26,43 @@ sealed class V3TableDescriptor { abstract val table: String abstract val schema: ModelSchema + abstract fun toTableDescriptor(entity: LabelEntity): TableDescriptor<*> + data class Edge( override val database: String, override val table: String, override val schema: ModelSchema.Edge, - ) : V3TableDescriptor() + ) : V3TableDescriptor() { + override fun toTableDescriptor(entity: LabelEntity): TableDescriptor.Edge = + TableDescriptor.Edge( + tenant = EntityName.tenant, + database = database, + table = table, + schema = schema, + mode = V3MutationMode.valueOf(entity.mode.name), + storage = entity.storage, + active = entity.active, + comment = entity.desc, + ) + } data class MultiEdge( override val database: String, override val table: String, override val schema: ModelSchema.MultiEdge, - ) : V3TableDescriptor() + ) : V3TableDescriptor() { + override fun toTableDescriptor(entity: LabelEntity): TableDescriptor.MultiEdge = + TableDescriptor.MultiEdge( + tenant = EntityName.tenant, + database = database, + table = table, + schema = schema, + mode = V3MutationMode.valueOf(entity.mode.name), + storage = entity.storage, + active = entity.active, + comment = entity.desc, + ) + } companion object { fun create(entity: LabelEntity): V3TableDescriptor { diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/EngineTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/EngineTest.kt new file mode 100644 index 00000000..dcb64b25 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/EngineTest.kt @@ -0,0 +1,62 @@ +package com.kakao.actionbase.engine + +import com.kakao.actionbase.core.metadata.DatabaseId +import com.kakao.actionbase.core.metadata.TableId +import com.kakao.actionbase.engine.catalog.Alias +import com.kakao.actionbase.engine.catalog.Catalog +import com.kakao.actionbase.engine.catalog.Database +import com.kakao.actionbase.engine.catalog.Table + +import kotlin.test.assertEquals +import kotlin.test.assertSame + +import org.junit.jupiter.api.Test + +class EngineTest { + @Test + fun `construction binds loader to self exactly once`() { + val loader = FakeLoader() + val engine = Engine(loader) + assertEquals(1, loader.bindCount) + assertSame(engine, loader.bound) + } + + @Test + fun `close delegates to loader`() { + val loader = FakeLoader() + val engine = Engine(loader) + engine.close() + assertEquals(1, loader.closeCount) + } + + @Test + fun `try-with-resources closes the loader`() { + val loader = FakeLoader() + Engine(loader).use { /* no-op */ } + assertEquals(1, loader.closeCount) + } + + @Test + fun `create wires defaults without throwing`() { + Engine.create().close() + } + + private class FakeLoader : Catalog { + var bound: Engine? = null + var bindCount = 0 + var closeCount = 0 + + override val databases: Map = emptyMap() + override val tables: Map = emptyMap() + override val aliases: Map = emptyMap() + + override fun bind(engine: Engine) { + bound = engine + bindCount++ + } + + override fun close() { + closeCount++ + } + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalogTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalogTest.kt new file mode 100644 index 00000000..9a198087 --- /dev/null +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/catalog/PeriodicCatalogTest.kt @@ -0,0 +1,103 @@ +package com.kakao.actionbase.engine.catalog + +import com.kakao.actionbase.engine.Engine + +import java.time.Duration + +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +import org.junit.jupiter.api.Test + +class PeriodicCatalogTest { + @Test + fun `periodic loop fires on schedule`() { + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay = Duration.ZERO, + catalogReloadInterval = Duration.ofMillis(20), + ) + Engine(catalog).use { + waitUntil(Duration.ofSeconds(2)) { catalog.reloadCount() >= 3 } + assertTrue(catalog.reloadCount() >= 3) + assertNotNull(catalog.lastReloadAt()) + } + } + + @Test + fun `close halts the loop`() { + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay = Duration.ZERO, + catalogReloadInterval = Duration.ofMillis(20), + ) + val engine = Engine(catalog) + waitUntil(Duration.ofSeconds(1)) { catalog.reloadCount() >= 1 } + engine.close() + Thread.sleep(50) + val snapshot = catalog.reloadCount() + Thread.sleep(100) + assertTrue(catalog.reloadCount() == snapshot, "reloadCount should not advance after close") + } + + @Test + fun `bind triggers exactly one reload before the periodic loop kicks in`() { + // A long interval lets us observe the initial reload alone. + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay = Duration.ZERO, + catalogReloadInterval = Duration.ofMinutes(10), + ) + Engine(catalog).use { + waitUntil(Duration.ofSeconds(1)) { catalog.reloadCount() == 1L } + // Window between the initial reload and the next periodic tick + // should stay at exactly 1. + Thread.sleep(50) + assertEquals(1, catalog.reloadCount()) + } + } + + @Test + fun `null interval still runs the initial reload`() { + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay = Duration.ZERO, + catalogReloadInterval = null, + ) + Engine(catalog).use { + waitUntil(Duration.ofSeconds(1)) { catalog.reloadCount() == 1L } + // No periodic loop, so the count must stay at 1 forever. + Thread.sleep(50) + assertEquals(1, catalog.reloadCount()) + } + } + + @Test + fun `initial delay defers the first reload`() { + val catalog = + PeriodicCatalog( + catalogReloadInitialDelay = Duration.ofMillis(300), + catalogReloadInterval = null, + ) + Engine(catalog).use { + // Before the delay expires, the reload has not fired yet. + Thread.sleep(100) + assertEquals(0, catalog.reloadCount()) + // After the delay, the one-shot reload arrives. + waitUntil(Duration.ofSeconds(2)) { catalog.reloadCount() == 1L } + } + } + + private fun waitUntil( + timeout: Duration, + condition: () -> Boolean, + ) { + val deadline = System.currentTimeMillis() + timeout.toMillis() + while (System.currentTimeMillis() < deadline) { + if (condition()) return + Thread.sleep(10) + } + throw AssertionError("condition not met within $timeout") + } +} diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBindingTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableTest.kt similarity index 98% rename from engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBindingTest.kt rename to engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableTest.kt index 50112d76..78476d72 100644 --- a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableBindingTest.kt +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/v3/V2BackedTableTest.kt @@ -9,8 +9,8 @@ import com.kakao.actionbase.core.state.SpecialStateValue import com.kakao.actionbase.core.state.State import com.kakao.actionbase.core.state.StateValue import com.kakao.actionbase.v2.core.code.hbase.Constants -import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding.Companion.mergeQualifiers -import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding.Companion.specialStateValueToNull +import com.kakao.actionbase.v2.engine.v3.V2BackedTable.Companion.mergeQualifiers +import com.kakao.actionbase.v2.engine.v3.V2BackedTable.Companion.specialStateValueToNull import kotlin.test.assertEquals import kotlin.test.assertNull @@ -21,7 +21,7 @@ import org.apache.hadoop.hbase.client.Put import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test -class V2BackedTableBindingTest { +class V2BackedTableTest { @Nested inner class MergeQualifiers { private fun groupRecord( @@ -99,7 +99,7 @@ class V2BackedTableBindingTest { /** * Verifies that EdgeCacheRecord encodes into HBase Put/Delete correctly, - * matching the pattern used in V2BackedTableBinding.buildHBaseMutations(). + * matching the pattern used in V2BackedTable.buildHBaseMutations(). */ @Nested inner class CacheHBaseMutations { diff --git a/server/src/main/kotlin/com/kakao/actionbase/server/configuration/EngineConfiguration.kt b/server/src/main/kotlin/com/kakao/actionbase/server/configuration/EngineConfiguration.kt new file mode 100644 index 00000000..131fd220 --- /dev/null +++ b/server/src/main/kotlin/com/kakao/actionbase/server/configuration/EngineConfiguration.kt @@ -0,0 +1,16 @@ +package com.kakao.actionbase.server.configuration + +import com.kakao.actionbase.engine.Engine + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +@Configuration +class EngineConfiguration { + @Bean(destroyMethod = "close") + fun engine(properties: ServerProperties): Engine = + Engine.create( + catalogReloadInitialDelay = properties.catalog.reloadInitialDelay, + catalogReloadInterval = properties.catalog.reloadInterval, + ) +} diff --git a/server/src/main/kotlin/com/kakao/actionbase/server/configuration/ServerProperties.kt b/server/src/main/kotlin/com/kakao/actionbase/server/configuration/ServerProperties.kt index 77fab174..61853ae0 100644 --- a/server/src/main/kotlin/com/kakao/actionbase/server/configuration/ServerProperties.kt +++ b/server/src/main/kotlin/com/kakao/actionbase/server/configuration/ServerProperties.kt @@ -3,6 +3,8 @@ package com.kakao.actionbase.server.configuration import com.kakao.actionbase.core.metadata.DatastoreDescriptor import com.kakao.actionbase.core.metadata.common.DatastoreType +import java.time.Duration + import org.springframework.boot.context.properties.ConfigurationProperties // NOTE: If DatastoreProperties is placed in a submodule, IntelliJ's application.yaml -> code navigation does not work. @@ -12,7 +14,13 @@ data class ServerProperties( val tenant: String, val datastore: DatastoreProperties, val readOnly: Boolean = false, + val catalog: CatalogProperties = CatalogProperties(), ) { + data class CatalogProperties( + val reloadInitialDelay: Duration = Duration.ZERO, + val reloadInterval: Duration? = null, + ) + data class DatastoreProperties( val type: DatastoreType, val configuration: Map = emptyMap(), diff --git a/server/src/main/resources/application.yaml b/server/src/main/resources/application.yaml index 5e8de487..34b7e93a 100644 --- a/server/src/main/resources/application.yaml +++ b/server/src/main/resources/application.yaml @@ -16,6 +16,11 @@ actionbase: tenant: ${AB_TENANT:ab-none} datastore: type: memory + catalog: + # Phase 1 of #247 — runs alongside kc.graph.metastoreReloadInterval + # until the v3 engine takes over the query path. + reload-initial-delay: 0s + reload-interval: 1m kc: graph: