diff --git a/engine/build.gradle.kts b/engine/build.gradle.kts index fe480262..30788970 100644 --- a/engine/build.gradle.kts +++ b/engine/build.gradle.kts @@ -51,8 +51,8 @@ dependencies { implementation(Dependencies.Logging.SLF4J_API) implementation(Dependencies.Logging.LOGBACK_CLASSIC) - // SlateDB (Maven Central) - implementation("io.slatedb:slatedb:0.11.0") + // SlateDB (Maven Central, UniFFI bindings) + implementation("io.slatedb:slatedb-uniffi:0.12.1") // HBase implementation(Dependencies.HBase.CLIENT) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt index 3dda4dd4..6063025f 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbConnections.kt @@ -3,29 +3,15 @@ package com.kakao.actionbase.v2.engine.storage.slatedb import com.kakao.actionbase.v2.engine.util.getLogger import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicBoolean -import io.slatedb.SlateDb -import io.slatedb.SlateDbConfig +import io.slatedb.uniffi.DbBuilder +import io.slatedb.uniffi.LogLevel +import io.slatedb.uniffi.ObjectStore +import io.slatedb.uniffi.Slatedb import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers -// The SlateDB C library uses a single global Tokio runtime that does not support -// concurrent block_on calls from multiple threads. All native FFI calls are routed -// through this global single-thread scheduler to prevent concurrent runtime entries. -// -// Uses Schedulers.fromExecutorService rather than Schedulers.newSingle to avoid -// marking the worker thread as Reactor NonBlocking. NonBlocking threads are -// monitored by BlockHound, which conflicts with the intentional blocking FFI calls. -object SlateDbScheduler { - val INSTANCE: reactor.core.scheduler.Scheduler = - Schedulers.fromExecutorService( - Executors.newSingleThreadExecutor { r -> Thread(r, "slatedb-worker") }, - "slatedb-worker", - ) -} - object SlateDbConnections { private val logger = getLogger() @@ -34,8 +20,11 @@ object SlateDbConnections { fun ensureInitialized() { if (initialized.compareAndSet(false, true)) { - logger.info("Initializing SlateDB (native library loaded from JAR classpath)") - SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) + extractSlateDbNativeLibrary() + logger.info("Initializing SlateDB (UniFFI native library loaded from JAR classpath)") + // The second argument is an optional foreign log callback; null routes + // log records to SlateDB's default tracing formatter on stderr. + Slatedb.initLogging(LogLevel.INFO, null) } } @@ -47,18 +36,21 @@ object SlateDbConnections { return connections.computeIfAbsent(cacheKey) { key -> Mono - .fromCallable { + .fromFuture { ensureInitialized() - val db = - SlateDb.builder(dbPath, url, null).use { builder -> + ObjectStore.resolve(url).use { objectStore -> + DbBuilder(dbPath, objectStore).use { builder -> builder.withMergeOperator(incrementMergeOperator) builder.build() } - SlateDbTable.create(db) - }.subscribeOn(Schedulers.boundedElastic()) - .doOnSuccess { - logger.info("Successfully opened SlateDB connection for cacheKey: {}", key) - }.doOnError { error -> + } + } + // ObjectStore.resolve and the synchronous DbBuilder calls touch the + // foreign runtime; route them off any reactor event loop. + .subscribeOn(Schedulers.boundedElastic()) + .map { db -> SlateDbTable.create(db) } + .doOnSuccess { logger.info("Successfully opened SlateDB connection for cacheKey: {}", key) } + .doOnError { error -> logger.error("Failed to open SlateDB connection for cacheKey: {}", key, error) connections.remove(key) }.cache() @@ -73,20 +65,13 @@ object SlateDbConnections { fun closeConnections(): Mono { val closeMonos = connections.entries.map { (key, tableMono) -> - tableMono - .flatMap { table -> - Mono - .fromRunnable { - try { - table.close() - logger.info("Closed SlateDB connection for cacheKey: {}", key) - } catch (e: Exception) { - logger.error("Error closing SlateDB connection for cacheKey: {}", key, e) - } - // Close on the same single-thread scheduler so it is enqueued - // after all pending FFI operations, preventing use-after-close. - }.subscribeOn(SlateDbScheduler.INSTANCE) - } + tableMono.flatMap { table -> + table + .close() + .doOnSuccess { logger.info("Closed SlateDB connection for cacheKey: {}", key) } + .doOnError { error -> logger.error("Error closing SlateDB connection for cacheKey: {}", key, error) } + .onErrorResume { Mono.empty() } + } } return Mono.`when`(closeMonos).doFinally { connections.clear() } } diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbNativeLibrary.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbNativeLibrary.kt new file mode 100644 index 00000000..18acaa11 --- /dev/null +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbNativeLibrary.kt @@ -0,0 +1,49 @@ +package com.kakao.actionbase.v2.engine.storage.slatedb + +import java.nio.file.Files +import java.nio.file.StandardCopyOption + +// Workaround for io.slatedb:slatedb-uniffi:0.12.x. The Java bindings are +// generated by IronCoreLabs/uniffi-bindgen-java, whose FFM loader resolves +// the library via `System.loadLibrary` / `System.load` and intentionally does +// not extract bundled natives from the classpath — consumers pass an absolute +// path via the `uniffi.component..libraryOverride` system property +// (see IronCoreLabs/uniffi-bindgen-java#58 and #59). The slatedb jar happens +// to ship the libraries as JNA-style resources, but no loader uses them. +// +// This helper materializes the OS/arch-specific resource to a temp file and +// sets that property. Delete this file if upstream ever ships a +// classpath-aware loader. +private const val LIBRARY_OVERRIDE_PROPERTY = "uniffi.component.slatedb.libraryOverride" + +internal fun extractSlateDbNativeLibrary() { + // Skip if the user already pinned the library path (e.g. for local dev + // against a built-from-source dylib). + if (System.getProperty(LIBRARY_OVERRIDE_PROPERTY) != null) return + + val osName = System.getProperty("os.name").lowercase() + val osArch = System.getProperty("os.arch").lowercase() + val (osDir, libFileName) = + when { + osName.contains("mac") || osName.contains("darwin") -> "darwin" to "libslatedb_uniffi.dylib" + osName.contains("linux") -> "linux" to "libslatedb_uniffi.so" + osName.contains("windows") -> "win32" to "slatedb_uniffi.dll" + else -> error("Unsupported OS for SlateDB UniFFI native library: $osName") + } + val archDir = + when { + osArch.contains("aarch64") || osArch.contains("arm64") -> "aarch64" + osArch.contains("amd64") || osArch.contains("x86_64") || osArch == "x86-64" -> "x86-64" + else -> error("Unsupported architecture for SlateDB UniFFI native library: $osArch") + } + val resourcePath = "$osDir-$archDir/$libFileName" + val input = + Thread.currentThread().contextClassLoader.getResourceAsStream(resourcePath) + ?: error("SlateDB UniFFI native library not bundled at classpath resource $resourcePath") + + val suffix = "." + libFileName.substringAfterLast('.') + val tempFile = Files.createTempFile("slatedb_uniffi", suffix) + tempFile.toFile().deleteOnExit() + input.use { Files.copy(it, tempFile, StandardCopyOption.REPLACE_EXISTING) } + System.setProperty(LIBRARY_OVERRIDE_PROPERTY, tempFile.toAbsolutePath().toString()) +} diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt index ed4b5e75..0c359ee4 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTable.kt @@ -3,9 +3,10 @@ package com.kakao.actionbase.v2.engine.storage.slatedb import java.nio.ByteBuffer import java.nio.ByteOrder -import io.slatedb.SlateDb -import io.slatedb.SlateDbKeyValue -import io.slatedb.SlateDbMergeOperator +import io.slatedb.uniffi.Db +import io.slatedb.uniffi.DbIterator +import io.slatedb.uniffi.MergeOperator +import io.slatedb.uniffi.WriteBatch import reactor.core.publisher.Mono fun Long.toSlateBytes(): ByteArray = @@ -18,7 +19,7 @@ fun Long.toSlateBytes(): ByteArray = fun ByteArray.toLong(): Long = ByteBuffer.wrap(this).order(ByteOrder.BIG_ENDIAN).long val incrementMergeOperator = - SlateDbMergeOperator { _, existingValue, operand -> + MergeOperator { _, existingValue, operand -> val current = existingValue?.toLong() ?: 0L val delta = operand.toLong() (current + delta).toSlateBytes() @@ -40,7 +41,7 @@ sealed class BatchOperation { ) : BatchOperation() } -interface SlateDbTable : AutoCloseable { +interface SlateDbTable { fun get(key: ByteArray): Mono fun put( @@ -64,91 +65,92 @@ interface SlateDbTable : AutoCloseable { fun batch(operations: List): Mono + fun close(): Mono + companion object { - fun create(db: SlateDb): SlateDbTable = SlateDbTableImpl(db) + fun create(db: Db): SlateDbTable = SlateDbTableImpl(db) } } internal class SlateDbTableImpl( - private val db: SlateDb, + private val db: Db, ) : SlateDbTable { - // The SlateDB C library uses a single global Tokio runtime with block_on, which - // does not support concurrent calls from multiple threads. The global single-thread - // scheduler serializes all native FFI calls across all database instances. - private val scheduler = SlateDbScheduler.INSTANCE + // All `Mono.fromFuture { db.(..) }` calls use the Supplier overload so + // the underlying CompletableFuture is created at subscription time, not + // when the Mono is constructed. The eager overload would start every + // operation in a chain like `put.then(delete).then(get)` immediately, + // racing them against each other. override fun get(key: ByteArray): Mono = - Mono - .fromCallable { db.get(key) } - .flatMap { Mono.justOrEmpty(it) } - .subscribeOn(scheduler) + Mono.fromFuture { db.get(key) }.flatMap { Mono.justOrEmpty(it) } override fun put( key: ByteArray, value: ByteArray, - ): Mono = - Mono - .fromCallable { db.put(key, value) } - .subscribeOn(scheduler) - .then() + ): Mono = Mono.fromFuture { db.put(key, value) }.then() - override fun delete(key: ByteArray): Mono = - Mono - .fromCallable { db.delete(key) } - .subscribeOn(scheduler) - .then() + override fun delete(key: ByteArray): Mono = Mono.fromFuture { db.delete(key) }.then() override fun merge( key: ByteArray, value: ByteArray, - ): Mono = - Mono - .fromCallable { db.merge(key, value) } - .subscribeOn(scheduler) - .then() + ): Mono = Mono.fromFuture { db.merge(key, value) }.then() - override fun flush(): Mono = - Mono - .fromCallable { db.flush() } - .subscribeOn(scheduler) - .then() + override fun flush(): Mono = Mono.fromFuture { db.flush() } override fun scanPrefix( prefix: ByteArray, limit: Int, ): Mono>> = Mono - .fromCallable { + .fromFuture { db.scanPrefix(prefix) } + .flatMap { iterator -> drainIterator(iterator, limit) } + + // DbIterator.next() returns CompletableFuture; null marks end-of-stream. + // Drain by chaining continuations rather than .get(), so the calling thread is + // never blocked while UniFFI's Tokio runtime fetches the next block. + private fun drainIterator( + iterator: DbIterator, + limit: Int, + ): Mono>> = + Mono + .create>> { sink -> val results = mutableListOf>() - db.scanPrefix(prefix).use { iterator -> - var kv: SlateDbKeyValue? = iterator.next() - var count = 0 - while (kv != null && count < limit) { - results.add(kv.key() to kv.value()) - count++ - kv = iterator.next() + fun pump() { + if (results.size >= limit) { + sink.success(results.toList()) + return + } + iterator.next().whenComplete { kv, err -> + when { + err != null -> sink.error(err) + kv == null -> sink.success(results.toList()) + else -> { + results.add(kv.key() to kv.value()) + pump() + } + } } } - results.toList() - }.subscribeOn(scheduler) + pump() + }.doFinally { iterator.close() } override fun batch(operations: List): Mono = Mono - .fromCallable { - SlateDb.newWriteBatch().use { batch -> - operations.forEach { op -> - when (op) { - is BatchOperation.Put -> batch.put(op.key, op.value) - is BatchOperation.Delete -> batch.delete(op.key) - is BatchOperation.Increment -> batch.merge(op.key, op.delta.toSlateBytes()) - } + .defer { + // Fresh batch per subscription (db.write consumes it). doFinally close + // pins it across the async write so UniFFI's cleaner cannot free the + // Rust handle mid-flight; close is idempotent. + val batch = WriteBatch() + operations.forEach { op -> + when (op) { + is BatchOperation.Put -> batch.put(op.key, op.value) + is BatchOperation.Delete -> batch.delete(op.key) + is BatchOperation.Increment -> batch.merge(op.key, op.delta.toSlateBytes()) } - db.write(batch) } - }.subscribeOn(scheduler) - .then() + Mono.fromFuture { db.write(batch) }.doFinally { batch.close() } + }.then() - override fun close() { - db.close() - } + override fun close(): Mono = Mono.fromFuture { db.shutdown() } } diff --git a/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt index 2322ff24..32af99bc 100644 --- a/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt +++ b/engine/src/test/kotlin/com/kakao/actionbase/engine/datastore/SlateDBDatastoreCompatibilityTest.kt @@ -1,6 +1,7 @@ package com.kakao.actionbase.engine.datastore import com.kakao.actionbase.v2.engine.storage.slatedb.BatchOperation +import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbConnections import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable import com.kakao.actionbase.v2.engine.storage.slatedb.incrementMergeOperator import com.kakao.actionbase.v2.engine.storage.slatedb.toLong @@ -14,8 +15,8 @@ import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.io.TempDir -import io.slatedb.SlateDb -import io.slatedb.SlateDbConfig +import io.slatedb.uniffi.DbBuilder +import io.slatedb.uniffi.ObjectStore /** * SlateDB compatibility test. @@ -38,18 +39,22 @@ class SlateDBDatastoreCompatibilityTest : DatastoreCompatibilityTest() { ) { assumeTrue(enabled, "SLATEDB_TEST=true not set") tempDir = dir - SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) - val db = - SlateDb.builder("data", "file://${tempDir.toAbsolutePath()}", null).use { builder -> - builder.withMergeOperator(incrementMergeOperator) - builder.build() + // Triggers native-library extraction + Slatedb.initLogging on the + // first call, then no-ops on subsequent ones. + SlateDbConnections.ensureInitialized() + val dbFuture = + ObjectStore.resolve("file://${tempDir.toAbsolutePath()}").use { objectStore -> + DbBuilder("data", objectStore).use { builder -> + builder.withMergeOperator(incrementMergeOperator) + builder.build() + } } - table = SlateDbTable.create(db) + table = SlateDbTable.create(dbFuture.join()) } @AfterAll fun tearDownSlateDB() { - table?.close() + table?.close()?.block() } override fun createStore(): StorageOperations = SlateDBOperations(table!!) diff --git a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt index c3cc9e75..94fb22b4 100644 --- a/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt +++ b/engine/src/test/kotlin/com/kakao/actionbase/v2/engine/storage/slatedb/SlateDbTableTest.kt @@ -8,8 +8,8 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir -import io.slatedb.SlateDb -import io.slatedb.SlateDbConfig +import io.slatedb.uniffi.DbBuilder +import io.slatedb.uniffi.ObjectStore import reactor.test.StepVerifier class SlateDbTableTest { @@ -20,19 +20,23 @@ class SlateDbTableTest { @BeforeEach fun setUp() { - SlateDb.initLogging(SlateDbConfig.LogLevel.INFO) - - val db = - SlateDb.builder("data", "file://${tempDir.toAbsolutePath()}", null).use { builder -> - builder.withMergeOperator(incrementMergeOperator) - builder.build() + // Triggers native-library extraction + Slatedb.initLogging on the + // first call, then no-ops on subsequent ones. + SlateDbConnections.ensureInitialized() + + val dbFuture = + ObjectStore.resolve("file://${tempDir.toAbsolutePath()}").use { objectStore -> + DbBuilder("data", objectStore).use { builder -> + builder.withMergeOperator(incrementMergeOperator) + builder.build() + } } - table = SlateDbTable.create(db) + table = SlateDbTable.create(dbFuture.join()) } @AfterEach fun tearDown() { - table.close() + table.close().block() } @Test