Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Instances {
val nextEid = Attr<Int>(
Gid(1, 5),
"Instance/nextEid",
QInt.code,
QInt.counter().code,
unique = false,
list = false
)
Expand Down
36 changes: 27 additions & 9 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0..31
val listRange = 32..63
val counterRange = 64..95

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,12 +35,12 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> ofCode((code - listRange.first).toByte())?.list()
in counterRange -> ofCode((code - counterRange.first).toByte())?.counter()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
is Boolean -> QBoolean as DataType<T>
Expand All @@ -46,7 +50,7 @@ sealed class DataType<out T : Any> {
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}
Expand All @@ -57,7 +61,14 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in listRange

fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in counterRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef

Expand All @@ -73,6 +84,7 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
QRef -> Any::class
}
}
Expand All @@ -81,7 +93,13 @@ sealed class DataType<out T : Any> {

data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>() {

override val code = (32 + itemsType.code).toByte()
override val code = (listRange.first + itemsType.code).toByte()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (counterRange.first + primitiveType.code).toByte()

}

Expand Down
43 changes: 30 additions & 13 deletions qbit-core/src/commonMain/kotlin/qbit/index/Index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package qbit.index

import qbit.api.db.QueryPred
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.tombstone
import qbit.platform.assert
Expand Down Expand Up @@ -61,17 +63,17 @@ class Index(
}
}

fun addFacts(facts: List<Eav>): Index =
addFacts(facts as Iterable<Eav>)
fun addFacts(facts: List<Eav>, resolveAttr: (String) -> Attr<*>? = { null }): Index =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может resolveAttr уже в поля засунуть, раз такая пьянка?

addFacts(facts as Iterable<Eav>, resolveAttr)

fun addFacts(facts: Iterable<Eav>): Index {
fun addFacts(facts: Iterable<Eav>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val entities = facts
.groupBy { it.gid }
.map { it.key to it.value }
return add(entities)
return add(entities, resolveAttr)
}

fun add(entities: List<RawEntity>): Index {
fun add(entities: List<RawEntity>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val newEntities = HashMap(this.entities)

// eavs of removed or updated entities
Expand All @@ -82,12 +84,27 @@ class Index(
val (gid, eavs) = e

val isUpdate = eavs[0].attr != tombstone.name
val obsoleteEntity =
if (isUpdate) {
newEntities.put(gid, e)
} else {
newEntities.remove(gid)
}
val obsoleteEntity = newEntities.get(gid)

if (isUpdate) {
val crdts = obsoleteEntity?.second
?.filter {
val attr = resolveAttr(it.attr)
if (attr == null) {
false
} else {
DataType.ofCode(attr.type)!!.isCounter()
}
}
?.filter {
crdtEav -> eavs.none { it.attr == crdtEav.attr }
}
?: emptyList()
obsoleteEavs.removeAll(crdts)
newEntities.put(gid, RawEntity(gid, eavs + crdts))
} else {
newEntities.remove(gid)
}

if (obsoleteEntity != null) {
obsoleteEavs.addAll(obsoleteEntity.second)
Expand All @@ -103,8 +120,8 @@ class Index(
return Index(newEntities, newAveIndex)
}

fun add(e: RawEntity): Index {
return add(listOf(e))
fun add(e: RawEntity, resolveAttr: (String) -> Attr<*>? = { null }): Index {
return add(listOf(e), resolveAttr)
}

fun entityById(eid: Gid): Map<String, List<Any>>? =
Expand Down
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import qbit.api.gid.Gid
import qbit.api.model.*
import qbit.api.model.impl.AttachedEntity
import qbit.collections.LimitedPersistentMap
import qbit.trx.deoperationalize
import qbit.typing.typify
import kotlin.reflect.KClass

Expand All @@ -30,8 +31,8 @@ class IndexDb(

private val dataClassesCache = atomic<LimitedPersistentMap<Entity, Any>>(LimitedPersistentMap(1024))

override fun with(facts: Iterable<Eav>): InternalDb {
return IndexDb(index.addFacts(facts), serialModule)
override fun with(facts: Iterable<Eav>): IndexDb {
return IndexDb(index.addFacts(deoperationalize(this, facts.toList()), this::attr), serialModule)
}

override fun pullEntity(gid: Gid): StoredEntity? {
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Indexer(
.toList()
.map { it.entities() }
.fold(base ?: IndexDb(Index(), serialModule)) { db, n ->
IndexDb(db.index.add(n), serialModule)
db.with(n.flatMap { it.second })
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package qbit.resolving

import kotlinx.coroutines.flow.toList
import qbit.api.Instances
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.index.RawEntity
Expand Down Expand Up @@ -41,7 +41,12 @@ data class LogsDiff(
resolve(writesFromA[it]!!, writesFromB[it]!!)
}
}
return resolvingEavsByGid.values.map { RawEntity(it.first().gid, it) }
return resolvingEavsByGid
.filter { it.value.isNotEmpty() } // CRDT values in eavs are operations.
// Operations should not be created during merge
// So, it is possible to have "empty" entities there
// They should be filtered out
.values.map { RawEntity(it.first().gid, it) }
}

fun logAEntities(): List<RawEntity> {
Expand All @@ -65,9 +70,8 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr<Any>?): (Li
?: throw IllegalArgumentException("Cannot resolve ${eavsFromA[0].eav.attr}")

when {
// temporary dirty hack until crdt counter or custom resolution strategy support is implemented
attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav)
attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct()
DataType.ofCode(attr.type)!!.isCounter() -> ArrayList()
else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav)
}
}
Expand Down
30 changes: 26 additions & 4 deletions qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SchemaBuilder(private val serialModule: SerializersModule) {
?: throw QBitException("Cannot find descriptor for $type")
val eb = EntityBuilder<T>(descr)
eb.body()
attrs.addAll(schemaFor(descr, eb.uniqueProps))
attrs.addAll(schemaFor(descr, eb.uniqueProps, eb.counters))
}

}
Expand All @@ -33,6 +33,8 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {

internal val uniqueProps = HashSet<String>()

internal val counters = HashSet<String>()

fun uniqueInt(prop: KProperty1<T, Int>) {
uniqueAttr(prop)
}
Expand All @@ -42,21 +44,41 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {
}

private fun uniqueAttr(prop: KProperty1<T, *>) {
uniqueProps.add(getAttrName(prop))
}

fun byteCounter(prop: KProperty1<T, Byte>) {
counter(prop)
}

fun intCounter(prop: KProperty1<T, Int>) {
counter(prop)
}

fun longCounter(prop: KProperty1<T, Long>) {
counter(prop)
}

private fun counter(prop: KProperty1<T, *>) {
counters.add(getAttrName(prop))
}

private fun getAttrName(prop: KProperty1<T, *>): String {
val (idx, _) = descr.elementNames
.withIndex().firstOrNull { (_, name) -> name == prop.name }
?: throw QBitException("Cannot find attr for ${prop.name} in $descr")
uniqueProps.add(AttrName(descr, idx).asString())
return AttrName(descr, idx).asString()
}

}

fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet()): List<Attr<Any>> {
fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet(), counters: Set<String> = emptySet()): List<Attr<Any>> {
return rootDesc.elementDescriptors
.withIndex()
.filter { rootDesc.getElementName(it.index) !in setOf("id", "gid") }
.map { (idx, desc) ->
val dataType = DataType.of(desc)
val attr = AttrName(rootDesc, idx).asString()
val dataType = if (attr in counters) DataType.of(desc).counter() else DataType.of(desc)
Attr<Any>(null, attr, dataType.code, attr in unique, dataType.isList())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal fun deserialize(ins: Input): Any {
private fun <T : Any> readMark(ins: Input, expectedMark: DataType<T>): Any {
return when (expectedMark) {
QBoolean -> (ins.readByte() == 1.toByte()) as T
QByte, QInt, QLong -> readLong(ins) as T
QByte, QInt, QLong, is QCounter<*> -> readLong(ins) as T

QBytes -> readLong(ins).let { count ->
readBytes(ins, count.toInt()) as T
Expand Down
49 changes: 49 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package qbit.trx

import qbit.api.QBitException
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.index.InternalDb

fun operationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { operationalizeCounter(db, it) }
}

private fun operationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)!!
val dataType = DataType.ofCode(attr.type)!!
return if (dataType.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (previous is Byte && fact.value is Byte) fact.value - previous
else if (previous is Int && fact.value is Int) fact.value - previous
else if (previous is Long && fact.value is Long) fact.value - previous
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}

fun deoperationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { deoperationalizeCounter(db, it) }
}

private fun deoperationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)
return if (attr != null && DataType.ofCode(attr.type)!!.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (fact.value is Byte) (previous as Number).toByte() + fact.value
else if (fact.value is Int) (previous as Number).toInt() + fact.value
else if (fact.value is Long) (previous as Number).toLong() + fact.value
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ internal class QTrx(
return QbitWriteResult(entityGraphRoot, curDb)
}
validate(curDb, updatedFacts)
factsBuffer.addAll(updatedFacts)
curDb = curDb.with(updatedFacts)
val operationalizedFacts = operationalize(curDb, updatedFacts)
factsBuffer.addAll(operationalizedFacts)
curDb = curDb.with(operationalizedFacts)

val res = if (facts.entityFacts[entityGraphRoot]!!.firstOrNull()?.gid in entities) {
entityGraphRoot
Expand Down
Loading