Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions engine/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ dependencies {
implementation(Dependencies.Logging.SLF4J_API)
implementation(Dependencies.Logging.LOGBACK_CLASSIC)

// SlateDB (Maven Central)
implementation("io.slatedb:slatedb:0.11.0")
// SlateDB (Maven Central, UniFFI bindings)
implementation("io.slatedb:slatedb-uniffi:0.12.1")

// HBase
implementation(Dependencies.HBase.CLIENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,15 @@ package com.kakao.actionbase.v2.engine.storage.slatedb
import com.kakao.actionbase.v2.engine.util.getLogger

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean

import io.slatedb.SlateDb
import io.slatedb.SlateDbConfig
import io.slatedb.uniffi.DbBuilder
import io.slatedb.uniffi.LogLevel
import io.slatedb.uniffi.ObjectStore
import io.slatedb.uniffi.Slatedb
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

// The SlateDB C library uses a single global Tokio runtime that does not support
// concurrent block_on calls from multiple threads. All native FFI calls are routed
// through this global single-thread scheduler to prevent concurrent runtime entries.
//
// Uses Schedulers.fromExecutorService rather than Schedulers.newSingle to avoid
// marking the worker thread as Reactor NonBlocking. NonBlocking threads are
// monitored by BlockHound, which conflicts with the intentional blocking FFI calls.
object SlateDbScheduler {
val INSTANCE: reactor.core.scheduler.Scheduler =
Schedulers.fromExecutorService(
Executors.newSingleThreadExecutor { r -> Thread(r, "slatedb-worker") },
"slatedb-worker",
)
}

object SlateDbConnections {
private val logger = getLogger()

Expand All @@ -34,8 +20,11 @@ object SlateDbConnections {

fun ensureInitialized() {
if (initialized.compareAndSet(false, true)) {
logger.info("Initializing SlateDB (native library loaded from JAR classpath)")
SlateDb.initLogging(SlateDbConfig.LogLevel.INFO)
extractSlateDbNativeLibrary()
logger.info("Initializing SlateDB (UniFFI native library loaded from JAR classpath)")
// The second argument is an optional foreign log callback; null routes
// log records to SlateDB's default tracing formatter on stderr.
Slatedb.initLogging(LogLevel.INFO, null)
}
}

Expand All @@ -47,18 +36,21 @@ object SlateDbConnections {

return connections.computeIfAbsent(cacheKey) { key ->
Mono
.fromCallable {
.fromFuture {
ensureInitialized()
val db =
SlateDb.builder(dbPath, url, null).use { builder ->
ObjectStore.resolve(url).use { objectStore ->
DbBuilder(dbPath, objectStore).use { builder ->
builder.withMergeOperator(incrementMergeOperator)
builder.build()
}
SlateDbTable.create(db)
}.subscribeOn(Schedulers.boundedElastic())
.doOnSuccess {
logger.info("Successfully opened SlateDB connection for cacheKey: {}", key)
}.doOnError { error ->
}
}
// ObjectStore.resolve and the synchronous DbBuilder calls touch the
// foreign runtime; route them off any reactor event loop.
.subscribeOn(Schedulers.boundedElastic())
.map { db -> SlateDbTable.create(db) }
.doOnSuccess { logger.info("Successfully opened SlateDB connection for cacheKey: {}", key) }
.doOnError { error ->
logger.error("Failed to open SlateDB connection for cacheKey: {}", key, error)
connections.remove(key)
}.cache()
Expand All @@ -73,20 +65,13 @@ object SlateDbConnections {
fun closeConnections(): Mono<Void> {
val closeMonos =
connections.entries.map { (key, tableMono) ->
tableMono
.flatMap { table ->
Mono
.fromRunnable<Void> {
try {
table.close()
logger.info("Closed SlateDB connection for cacheKey: {}", key)
} catch (e: Exception) {
logger.error("Error closing SlateDB connection for cacheKey: {}", key, e)
}
// Close on the same single-thread scheduler so it is enqueued
// after all pending FFI operations, preventing use-after-close.
}.subscribeOn(SlateDbScheduler.INSTANCE)
}
tableMono.flatMap { table ->
table
.close()
.doOnSuccess { logger.info("Closed SlateDB connection for cacheKey: {}", key) }
.doOnError { error -> logger.error("Error closing SlateDB connection for cacheKey: {}", key, error) }
.onErrorResume { Mono.empty() }
}
}
return Mono.`when`(closeMonos).doFinally { connections.clear() }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.kakao.actionbase.v2.engine.storage.slatedb

import java.nio.file.Files
import java.nio.file.StandardCopyOption

// Workaround for io.slatedb:slatedb-uniffi:0.12.x. The Java bindings are
// generated by IronCoreLabs/uniffi-bindgen-java, whose FFM loader resolves
// the library via `System.loadLibrary` / `System.load` and intentionally does
// not extract bundled natives from the classpath — consumers pass an absolute
// path via the `uniffi.component.<ns>.libraryOverride` system property
// (see IronCoreLabs/uniffi-bindgen-java#58 and #59). The slatedb jar happens
// to ship the libraries as JNA-style resources, but no loader uses them.
//
// This helper materializes the OS/arch-specific resource to a temp file and
// sets that property. Delete this file if upstream ever ships a
// classpath-aware loader.
private const val LIBRARY_OVERRIDE_PROPERTY = "uniffi.component.slatedb.libraryOverride"

internal fun extractSlateDbNativeLibrary() {
// Skip if the user already pinned the library path (e.g. for local dev
// against a built-from-source dylib).
if (System.getProperty(LIBRARY_OVERRIDE_PROPERTY) != null) return

val osName = System.getProperty("os.name").lowercase()
val osArch = System.getProperty("os.arch").lowercase()
val (osDir, libFileName) =
when {
osName.contains("mac") || osName.contains("darwin") -> "darwin" to "libslatedb_uniffi.dylib"
osName.contains("linux") -> "linux" to "libslatedb_uniffi.so"
osName.contains("windows") -> "win32" to "slatedb_uniffi.dll"
else -> error("Unsupported OS for SlateDB UniFFI native library: $osName")
}
val archDir =
when {
osArch.contains("aarch64") || osArch.contains("arm64") -> "aarch64"
osArch.contains("amd64") || osArch.contains("x86_64") || osArch == "x86-64" -> "x86-64"
else -> error("Unsupported architecture for SlateDB UniFFI native library: $osArch")
}
val resourcePath = "$osDir-$archDir/$libFileName"
val input =
Thread.currentThread().contextClassLoader.getResourceAsStream(resourcePath)
?: error("SlateDB UniFFI native library not bundled at classpath resource $resourcePath")

val suffix = "." + libFileName.substringAfterLast('.')
val tempFile = Files.createTempFile("slatedb_uniffi", suffix)
tempFile.toFile().deleteOnExit()
input.use { Files.copy(it, tempFile, StandardCopyOption.REPLACE_EXISTING) }
System.setProperty(LIBRARY_OVERRIDE_PROPERTY, tempFile.toAbsolutePath().toString())
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package com.kakao.actionbase.v2.engine.storage.slatedb
import java.nio.ByteBuffer
import java.nio.ByteOrder

import io.slatedb.SlateDb
import io.slatedb.SlateDbKeyValue
import io.slatedb.SlateDbMergeOperator
import io.slatedb.uniffi.Db
import io.slatedb.uniffi.DbIterator
import io.slatedb.uniffi.MergeOperator
import io.slatedb.uniffi.WriteBatch
import reactor.core.publisher.Mono

fun Long.toSlateBytes(): ByteArray =
Expand All @@ -18,7 +19,7 @@ fun Long.toSlateBytes(): ByteArray =
fun ByteArray.toLong(): Long = ByteBuffer.wrap(this).order(ByteOrder.BIG_ENDIAN).long

val incrementMergeOperator =
SlateDbMergeOperator { _, existingValue, operand ->
MergeOperator { _, existingValue, operand ->
val current = existingValue?.toLong() ?: 0L
val delta = operand.toLong()
(current + delta).toSlateBytes()
Expand All @@ -40,7 +41,7 @@ sealed class BatchOperation {
) : BatchOperation()
}

interface SlateDbTable : AutoCloseable {
interface SlateDbTable {
fun get(key: ByteArray): Mono<ByteArray>

fun put(
Expand All @@ -64,91 +65,92 @@ interface SlateDbTable : AutoCloseable {

fun batch(operations: List<BatchOperation>): Mono<Void>

fun close(): Mono<Void>

companion object {
fun create(db: SlateDb): SlateDbTable = SlateDbTableImpl(db)
fun create(db: Db): SlateDbTable = SlateDbTableImpl(db)
}
}

internal class SlateDbTableImpl(
private val db: SlateDb,
private val db: Db,
) : SlateDbTable {
// The SlateDB C library uses a single global Tokio runtime with block_on, which
// does not support concurrent calls from multiple threads. The global single-thread
// scheduler serializes all native FFI calls across all database instances.
private val scheduler = SlateDbScheduler.INSTANCE
// All `Mono.fromFuture { db.<op>(..) }` calls use the Supplier overload so
// the underlying CompletableFuture is created at subscription time, not
// when the Mono is constructed. The eager overload would start every
// operation in a chain like `put.then(delete).then(get)` immediately,
// racing them against each other.

override fun get(key: ByteArray): Mono<ByteArray> =
Mono
.fromCallable { db.get(key) }
.flatMap { Mono.justOrEmpty(it) }
.subscribeOn(scheduler)
Mono.fromFuture { db.get(key) }.flatMap { Mono.justOrEmpty(it) }

override fun put(
key: ByteArray,
value: ByteArray,
): Mono<Void> =
Mono
.fromCallable { db.put(key, value) }
.subscribeOn(scheduler)
.then()
): Mono<Void> = Mono.fromFuture { db.put(key, value) }.then()

override fun delete(key: ByteArray): Mono<Void> =
Mono
.fromCallable { db.delete(key) }
.subscribeOn(scheduler)
.then()
override fun delete(key: ByteArray): Mono<Void> = Mono.fromFuture { db.delete(key) }.then()

override fun merge(
key: ByteArray,
value: ByteArray,
): Mono<Void> =
Mono
.fromCallable { db.merge(key, value) }
.subscribeOn(scheduler)
.then()
): Mono<Void> = Mono.fromFuture { db.merge(key, value) }.then()

override fun flush(): Mono<Void> =
Mono
.fromCallable { db.flush() }
.subscribeOn(scheduler)
.then()
override fun flush(): Mono<Void> = Mono.fromFuture { db.flush() }

override fun scanPrefix(
prefix: ByteArray,
limit: Int,
): Mono<List<Pair<ByteArray, ByteArray>>> =
Mono
.fromCallable {
.fromFuture { db.scanPrefix(prefix) }
.flatMap { iterator -> drainIterator(iterator, limit) }

// DbIterator.next() returns CompletableFuture<KeyValue?>; null marks end-of-stream.
// Drain by chaining continuations rather than .get(), so the calling thread is
// never blocked while UniFFI's Tokio runtime fetches the next block.
private fun drainIterator(
iterator: DbIterator,
limit: Int,
): Mono<List<Pair<ByteArray, ByteArray>>> =
Mono
.create<List<Pair<ByteArray, ByteArray>>> { sink ->
val results = mutableListOf<Pair<ByteArray, ByteArray>>()
db.scanPrefix(prefix).use { iterator ->
var kv: SlateDbKeyValue? = iterator.next()
var count = 0
while (kv != null && count < limit) {
results.add(kv.key() to kv.value())
count++
kv = iterator.next()
fun pump() {
if (results.size >= limit) {
sink.success(results.toList())
return
}
iterator.next().whenComplete { kv, err ->
when {
err != null -> sink.error(err)
kv == null -> sink.success(results.toList())
else -> {
results.add(kv.key() to kv.value())
pump()
}
}
}
}
results.toList()
}.subscribeOn(scheduler)
pump()
}.doFinally { iterator.close() }

override fun batch(operations: List<BatchOperation>): Mono<Void> =
Mono
.fromCallable {
SlateDb.newWriteBatch().use { batch ->
operations.forEach { op ->
when (op) {
is BatchOperation.Put -> batch.put(op.key, op.value)
is BatchOperation.Delete -> batch.delete(op.key)
is BatchOperation.Increment -> batch.merge(op.key, op.delta.toSlateBytes())
}
.defer {
// Fresh batch per subscription (db.write consumes it). doFinally close
// pins it across the async write so UniFFI's cleaner cannot free the
// Rust handle mid-flight; close is idempotent.
val batch = WriteBatch()
operations.forEach { op ->
when (op) {
is BatchOperation.Put -> batch.put(op.key, op.value)
is BatchOperation.Delete -> batch.delete(op.key)
is BatchOperation.Increment -> batch.merge(op.key, op.delta.toSlateBytes())
}
db.write(batch)
}
}.subscribeOn(scheduler)
.then()
Mono.fromFuture { db.write(batch) }.doFinally { batch.close() }
}.then()

override fun close() {
db.close()
}
override fun close(): Mono<Void> = Mono.fromFuture { db.shutdown() }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.kakao.actionbase.engine.datastore

import com.kakao.actionbase.v2.engine.storage.slatedb.BatchOperation
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbConnections
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable
import com.kakao.actionbase.v2.engine.storage.slatedb.incrementMergeOperator
import com.kakao.actionbase.v2.engine.storage.slatedb.toLong
Expand All @@ -14,8 +15,8 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.io.TempDir

import io.slatedb.SlateDb
import io.slatedb.SlateDbConfig
import io.slatedb.uniffi.DbBuilder
import io.slatedb.uniffi.ObjectStore

/**
* SlateDB compatibility test.
Expand All @@ -38,18 +39,22 @@ class SlateDBDatastoreCompatibilityTest : DatastoreCompatibilityTest() {
) {
assumeTrue(enabled, "SLATEDB_TEST=true not set")
tempDir = dir
SlateDb.initLogging(SlateDbConfig.LogLevel.INFO)
val db =
SlateDb.builder("data", "file://${tempDir.toAbsolutePath()}", null).use { builder ->
builder.withMergeOperator(incrementMergeOperator)
builder.build()
// Triggers native-library extraction + Slatedb.initLogging on the
// first call, then no-ops on subsequent ones.
SlateDbConnections.ensureInitialized()
val dbFuture =
ObjectStore.resolve("file://${tempDir.toAbsolutePath()}").use { objectStore ->
DbBuilder("data", objectStore).use { builder ->
builder.withMergeOperator(incrementMergeOperator)
builder.build()
}
}
table = SlateDbTable.create(db)
table = SlateDbTable.create(dbFuture.join())
}

@AfterAll
fun tearDownSlateDB() {
table?.close()
table?.close()?.block()
}

override fun createStore(): StorageOperations = SlateDBOperations(table!!)
Expand Down
Loading
Loading