Skip to content
32 changes: 32 additions & 0 deletions engine/src/main/kotlin/com/kakao/actionbase/engine/Engine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.kakao.actionbase.engine

import com.kakao.actionbase.engine.catalog.Catalog
import com.kakao.actionbase.engine.catalog.PeriodicCatalog

import java.time.Duration

class Engine(
private val catalog: Catalog,
) : AutoCloseable {
init {
catalog.bind(this)
}

override fun close() {
catalog.close()
}

companion object {
fun create(
catalogReloadInitialDelay: Duration = Duration.ZERO,
catalogReloadInterval: Duration? = null,
): Engine {
val catalog =
PeriodicCatalog(
catalogReloadInitialDelay,
catalogReloadInterval,
)
return Engine(catalog)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.kakao.actionbase.engine

import com.kakao.actionbase.core.edge.MutationEvent
import com.kakao.actionbase.core.state.State
import com.kakao.actionbase.engine.binding.TableBinding
import com.kakao.actionbase.engine.catalog.Table
import com.kakao.actionbase.engine.metadata.MutationMode

import reactor.core.publisher.Mono
Expand All @@ -13,12 +13,12 @@ import reactor.core.publisher.Mono
*/
interface MutationEngine {
/**
* Resolves a table binding from a database/alias pair.
* Resolves a table from a database/alias pair.
*/
fun getTableBinding(
fun getTable(
database: String,
alias: String,
): TableBinding
): Table

fun writeWal(
ctx: MutationContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.kakao.actionbase.engine

import com.kakao.actionbase.engine.binding.TableBinding
import com.kakao.actionbase.engine.catalog.Table
import com.kakao.actionbase.engine.query.ActionbaseQuery
import com.kakao.actionbase.v2.engine.sql.DataFrame

Expand All @@ -11,10 +11,10 @@ import reactor.core.publisher.Mono
* Decouples QueryService from Graph by exposing only the operations it needs.
*/
interface QueryEngine {
fun getTableBinding(
fun getTable(
database: String,
alias: String,
): TableBinding
): Table

fun query(request: ActionbaseQuery): Mono<Map<String, DataFrame>>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.kakao.actionbase.engine.catalog

import com.kakao.actionbase.core.metadata.AliasDescriptor

interface Alias {
val descriptor: AliasDescriptor

/** The table this alias resolves to, already materialized. */
val table: Table
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.kakao.actionbase.engine.catalog

import com.kakao.actionbase.core.metadata.DatabaseId
import com.kakao.actionbase.core.metadata.TableId
import com.kakao.actionbase.engine.Engine

interface Catalog : AutoCloseable {
fun bind(engine: Engine)

val databases: Map<DatabaseId, Database>

val tables: Map<TableId, Table>

val aliases: Map<TableId, Alias>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.kakao.actionbase.engine.catalog

import com.kakao.actionbase.core.metadata.DatabaseDescriptor

interface Database {
val descriptor: DatabaseDescriptor
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.kakao.actionbase.engine.catalog

import com.kakao.actionbase.core.metadata.DatabaseId
import com.kakao.actionbase.core.metadata.TableId
import com.kakao.actionbase.engine.Engine

import java.time.Duration
import java.time.Instant

import org.slf4j.LoggerFactory

import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

class PeriodicCatalog(
private val catalogReloadInitialDelay: Duration,
private val catalogReloadInterval: Duration?,
) : Catalog {
// --- state ---
@Volatile private var snapshot: Snapshot = Snapshot.EMPTY

@Volatile private var reloadCount: Long = 0

@Volatile private var lastReloadAt: Instant? = null

@Volatile private var engine: Engine? = null

private var bound = false

private var disposable: Disposable? = null

// --- public views ---
override val databases: Map<DatabaseId, Database> get() = snapshot.databases
override val tables: Map<TableId, Table> get() = snapshot.tables
override val aliases: Map<TableId, Alias> get() = snapshot.aliases

@Synchronized
override fun bind(engine: Engine) {
if (bound) {
log.warn("PeriodicCatalog already bound")
return
}
this.engine = engine
bound = true

val interval = catalogReloadInterval
if (interval == null) {
log.info(
"catalog periodic reload disabled; one-shot reload after {} ms.",
catalogReloadInitialDelay.toMillis(),
)
} else {
log.info(
"Starting Flux.interval for reloading catalog every {} ms after {} ms delay.",
interval.toMillis(),
catalogReloadInitialDelay.toMillis(),
)
}

val source: Flux<Long> =
if (interval == null) {
Mono.delay(catalogReloadInitialDelay).flux()
} else {
Flux.interval(catalogReloadInitialDelay, interval)
}

disposable =
source
.onBackpressureDrop { log.warn("backpressure drop {}", it) }
.doOnNext { reload() }
.subscribeOn(Schedulers.boundedElastic())
.onErrorContinue { error, _ ->
log.error(
"Error occurred during catalog reload or unexpected error: {}. Continuing with next interval.",
error.message,
error,
)
}.subscribe()
}

@Synchronized
override fun close() {
if (!bound) return
log.info("PeriodicCatalog closing after {} reloads", reloadCount)
disposable?.dispose()
disposable = null
engine = null
}

fun reloadCount(): Long = reloadCount

fun lastReloadAt(): Instant? = lastReloadAt

private fun reload() {
// Guards against an in-flight tick that fires after `close()`
// nulled out `engine`. Phase 2 will read the catalog through
// `engine` here and swap `snapshot` atomically, reusing existing
// Table instances when their descriptors haven't changed.
if (engine == null) return
log.debug("reloading catalog")
// Phase 2: snapshot = Snapshot(freshDatabases, freshTables, freshAliases)
reloadCount++
lastReloadAt = Instant.now()
}

private data class Snapshot(
val databases: Map<DatabaseId, Database>,
val tables: Map<TableId, Table>,
val aliases: Map<TableId, Alias>,
) {
companion object {
val EMPTY = Snapshot(emptyMap(), emptyMap(), emptyMap())
}
}

companion object {
private val log = LoggerFactory.getLogger(PeriodicCatalog::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.kakao.actionbase.engine.binding
package com.kakao.actionbase.engine.catalog

import com.kakao.actionbase.core.edge.MutationKey
import com.kakao.actionbase.core.edge.payload.DataFrameEdgeAggPayload
import com.kakao.actionbase.core.edge.payload.DataFrameEdgeCountPayload
import com.kakao.actionbase.core.edge.payload.DataFrameEdgePayload
import com.kakao.actionbase.core.metadata.TableDescriptor
import com.kakao.actionbase.core.metadata.common.ModelSchema
import com.kakao.actionbase.core.state.State
import com.kakao.actionbase.engine.metadata.MutationMode
import com.kakao.actionbase.v2.core.metadata.Direction

import reactor.core.publisher.Mono

interface TableBinding {
interface Table {
val descriptor: TableDescriptor<*>
val table: String
val schema: ModelSchema
val mutationMode: MutationMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.kakao.actionbase.core.state.transit
import com.kakao.actionbase.engine.Audit
import com.kakao.actionbase.engine.MutationContext
import com.kakao.actionbase.engine.MutationEngine
import com.kakao.actionbase.engine.binding.TableBinding
import com.kakao.actionbase.engine.catalog.Table
import com.kakao.actionbase.engine.context.RequestContext
import com.kakao.actionbase.engine.metadata.MutationMode
import com.kakao.actionbase.engine.metadata.MutationModeContext
Expand All @@ -35,7 +35,7 @@ class MutationService(
): Mono<List<MutationResult>> =
Mono
.fromCallable {
val tb = engine.getTableBinding(database, alias)
val tb = engine.getTable(database, alias)
val ctx =
MutationContext(
database = database,
Expand Down Expand Up @@ -73,7 +73,7 @@ class MutationService(
}

private fun readModifyWrite(
tb: TableBinding,
tb: Table,
key: MutationKey,
sorted: List<MutationEvent>,
acquireLock: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class QueryService(
require(filters == null) { "`filters` is not yet supported in count query." }
require(features.isEmpty()) { "`features` ${features.joinToString(", ")} are not supported in get query." }

return engine.getTableBinding(database, table).count(start.toSet(), direction)
return engine.getTable(database, table).count(start.toSet(), direction)
}

@Suppress("UnusedParameter")
Expand All @@ -69,7 +69,7 @@ class QueryService(
target.distinct().map { t -> s to t }
}

return engine.getTableBinding(database, table).gets(keys, filters)
return engine.getTable(database, table).gets(keys, filters)
}

@Suppress("UnusedParameter")
Expand All @@ -82,14 +82,14 @@ class QueryService(
): Mono<DataFrameEdgePayload> {
require(features.isEmpty()) { "`features` ${features.joinToString(", ")} are not supported in get query." }

val tb = engine.getTableBinding(database, table)
val t = engine.getTable(database, table)

require(tb.schema is com.kakao.actionbase.core.metadata.common.ModelSchema.MultiEdge) {
require(t.schema is com.kakao.actionbase.core.metadata.common.ModelSchema.MultiEdge) {
"get query with ids is only supported for multi-edge tables."
}

val keys = ids.distinct().map { id -> id to id }
return tb.gets(keys, filters)
return t.gets(keys, filters)
}

fun scan(
Expand All @@ -103,7 +103,7 @@ class QueryService(
ranges: String? = null,
filters: String? = null,
features: List<String> = emptyList(),
): Mono<DataFrameEdgePayload> = engine.getTableBinding(database, table).scan(index, start, direction, limit, offset, ranges, filters, features)
): Mono<DataFrameEdgePayload> = engine.getTable(database, table).scan(index, start, direction, limit, offset, ranges, filters, features)

fun seek(
database: String,
Expand All @@ -113,7 +113,7 @@ class QueryService(
direction: Direction,
limit: Int = ScanFilter.defaultLimit,
offset: String? = null,
): Mono<DataFrameEdgePayload> = engine.getTableBinding(database, table).seek(cache, start, direction, limit, offset)
): Mono<DataFrameEdgePayload> = engine.getTable(database, table).seek(cache, start, direction, limit, offset)

fun agg(
database: String,
Expand All @@ -125,7 +125,7 @@ class QueryService(
filters: String? = null,
features: List<String> = emptyList(),
ttl: Long? = null,
): Mono<DataFrameEdgeAggPayload> = engine.getTableBinding(database, table).agg(group, start, direction, ranges, filters, features, ttl)
): Mono<DataFrameEdgeAggPayload> = engine.getTable(database, table).agg(group, start, direction, ranges, filters, features, ttl)

fun query(request: ActionbaseQuery): Mono<Map<String, DataFrame>> = engine.query(request)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.kakao.actionbase.v2.engine.label.hbase
import com.kakao.actionbase.core.edge.Edge
import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper
import com.kakao.actionbase.core.edge.record.EdgeCacheRecord
import com.kakao.actionbase.engine.binding.TableBinding
import com.kakao.actionbase.engine.catalog.Table
import com.kakao.actionbase.v2.core.code.EdgeEncoder
import com.kakao.actionbase.v2.core.code.IdEdgeEncoder
import com.kakao.actionbase.v2.core.code.Index
Expand All @@ -24,8 +24,8 @@ import com.kakao.actionbase.v2.engine.sql.ScanFilter
import com.kakao.actionbase.v2.engine.sql.StatKey
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTables
import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding
import com.kakao.actionbase.v2.engine.v3.V2BackedTableBinding.Companion.toV3
import com.kakao.actionbase.v2.engine.v3.V2BackedTable
import com.kakao.actionbase.v2.engine.v3.V2BackedTable.Companion.toV3
import com.kakao.actionbase.v2.engine.v3.V3TableDescriptor

import reactor.core.publisher.Mono
Expand All @@ -47,9 +47,9 @@ open class HBaseIndexedLabel(
tables = tables,
),
IndexedLabelMixin<ByteArray> {
val tableBinding: TableBinding =
V2BackedTableBinding(
descriptor = V3TableDescriptor.create(entity),
val table: Table =
V2BackedTable(
v3 = V3TableDescriptor.create(entity),
label = this,
mapper = edgeRecordMapper,
lockTimeout = lockTimeout,
Expand Down
Loading
Loading