Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions qbit-core/src/commonMain/kotlin/qbit/Conn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import qbit.index.Indexer
import qbit.index.InternalDb
import qbit.index.RawEntity
import qbit.ns.Namespace
import qbit.resolving.lastWriterWinsResolve
import qbit.resolving.crdtResolve
import qbit.resolving.logsDiff
import qbit.serialization.*
import qbit.spi.Storage
Expand Down Expand Up @@ -122,14 +122,14 @@ class QConn(
}
}

override suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
override suspend fun update(trxLog: TrxLog, baseDb: InternalDb, newLog: TrxLog, newDb: InternalDb) {
val (log, db) =
if (hasConcurrentTrx(trxLog)) {
mergeLogs(trxLog, this.trxLog, newLog, newDb)
mergeLogs(trxLog, this.trxLog, newLog, baseDb, newDb)
} else {
newLog to newDb
}
storage.overwrite(Namespace("refs")["head"], newLog.hash.bytes)
storage.overwrite(Namespace("refs")["head"], log.hash.bytes)
this.trxLog = log
this.db = db
}
Expand All @@ -141,6 +141,7 @@ class QConn(
baseLog: TrxLog,
committedLog: TrxLog,
committingLog: TrxLog,
baseDb: InternalDb,
newDb: InternalDb
): Pair<TrxLog, InternalDb> {
val logsDifference = logsDiff(baseLog, committedLog, committingLog, resolveNode)
Expand All @@ -149,7 +150,7 @@ class QConn(
.logAEntities()
.toEavsList()
val reconciliationEavs = logsDifference
.reconciliationEntities(lastWriterWinsResolve { db.attr(it) })
.reconciliationEntities(crdtResolve(baseDb::pullEntity, db::attr))
.toEavsList()

val mergedDb = newDb
Expand Down
61 changes: 48 additions & 13 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0..31
val listRange = 32..63
val pnCounterRange = 64..95
val registerRange = 96..127

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,22 +36,23 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> ofCode((code - listRange.first).toByte())?.list()
in pnCounterRange -> ofCode((code - pnCounterRange.first).toByte())?.counter()
in registerRange -> ofCode((code - registerRange.first).toByte())?.register()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) { // TODO REFACTOR
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Рефактор что, как, зачем?

is Boolean -> QBoolean as DataType<T>
is Byte -> QByte as DataType<T>
is Int -> QInt as DataType<T>
is Long -> QLong as DataType<T>
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}
Expand All @@ -57,9 +63,25 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in listRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef
fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in pnCounterRange

fun register(): QRegister<T> {
require(!(this is QList<*> || this is QCounter || this is QRegister)) { "Nested wrappers is not allowed" }
return QRegister(this)
}

fun isRegister(): Boolean = code in registerRange

fun ref(): Boolean = this == QRef ||
this is QList<*> && this.itemsType == QRef ||
this is QRegister<*> && this.itemsType == QRef

fun value(): Boolean = !ref()

Expand All @@ -73,15 +95,28 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
is QRegister<*> -> this.itemsType.typeClass()
QRef -> Any::class
}
}

}

data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>() {

override val code = (32 + itemsType.code).toByte()
override val code = (listRange.first + itemsType.code).toByte()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (pnCounterRange.first + primitiveType.code).toByte()

}

data class QRegister<out I : Any>(val itemsType: DataType<I>) : DataType<I>() {

override val code = (registerRange.first + itemsType.code).toByte()

}

Expand Down Expand Up @@ -134,4 +169,4 @@ object QGid : DataType<Gid>() {
}

fun isListOfVals(list: List<Any>?) =
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true // TODO REFACTOR
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тот же вопрос

14 changes: 14 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/Register.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package qbit.api.model

import kotlinx.serialization.Serializable

@Serializable
class Register<T>(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне тут не нравится всё

  1. Нельзя использовать изменяемые структуры данных
  2. почему АПИ позволяет создание сразу "переполненого регистра"?
  3. У меня точно было это в голове и кажись мы где-то в чатиках обсуждали, что регистры должны быть обычными типами данных, на которые в схеме навешана функция свёртки списка в одно значение
  4. Я щяс чёт ваще не вижу, чтобы этот тип добавлял какую-то ценность относительно списков

private var entries: List<T>
) {
fun getValues(): List<T> = entries

fun setValue(t: T) {
entries = listOf(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.modules.SerializersModuleCollector
import qbit.api.QBitException
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.Eav
import qbit.api.model.Entity
import qbit.api.model.Tombstone
import qbit.api.model.*
import qbit.api.tombstone
import qbit.collections.IdentityMap
import qbit.factoring.*
Expand Down Expand Up @@ -99,6 +96,12 @@ internal class EntityEncoder(
ValueKind.REF_LIST -> {
serializeRefList(value as Iterable<Any>)
}
ValueKind.VALUE_REGISTER -> {
(value as Register<Any>).getValues()
}
ValueKind.REF_REGISTER -> {
serializeRefList((value as Register<Any>).getValues())
}
}

val fieldPointer = Pointer(
Expand Down Expand Up @@ -185,7 +188,7 @@ internal class EntityEncoder(

enum class ValueKind {

SCALAR_VALUE, SCALAR_REF, VALUE_LIST, REF_LIST;
SCALAR_VALUE, SCALAR_REF, VALUE_LIST, REF_LIST, VALUE_REGISTER, REF_REGISTER;

companion object {
fun of(descriptor: SerialDescriptor, index: Int, value: Any): ValueKind {
Expand All @@ -194,7 +197,10 @@ enum class ValueKind {
isScalarValue(value) -> {
SCALAR_VALUE
}
isScalarRef(elementDescriptor) -> {
isScalarRef(
elementDescriptor,
value
) -> {
SCALAR_REF
}
isValueList(
Expand All @@ -209,6 +215,18 @@ enum class ValueKind {
) -> {
REF_LIST
}
isValueRegister(
elementDescriptor,
value
) -> {
VALUE_REGISTER
}
isRefRegister(
elementDescriptor,
value
) -> {
REF_REGISTER
}
else -> {
throw AssertionError("Writing primitive via encodeSerializableElement")
}
Expand All @@ -219,8 +237,8 @@ enum class ValueKind {
// other primitive values are encoded directly via encodeXxxElement
value is Gid || value is ByteArray

private fun isScalarRef(elementDescriptor: SerialDescriptor) =
elementDescriptor.kind == StructureKind.CLASS
private fun isScalarRef(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.CLASS && value !is Register<*>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!is Register<*> надо где-то заныкать в виде isNotIntrinsic


private fun isValueList(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.LIST &&
Expand All @@ -231,6 +249,13 @@ enum class ValueKind {
private fun isRefList(elementDescriptor: SerialDescriptor, value: Any) =
elementDescriptor.kind == StructureKind.LIST && value is List<*>

private fun isValueRegister(elementDescriptor: SerialDescriptor, value: Any) =
value is Register<*> && //TODO DEDUPLICATE
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

С чем?

(elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind is PrimitiveKind ||
elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind == StructureKind.LIST) // ByteArray

private fun isRefRegister(elementDescriptor: SerialDescriptor, value: Any) = // TODO REFACTOR
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как?

value is Register<*> && elementDescriptor.getElementDescriptor(0).getElementDescriptor(0).kind is StructureKind.CLASS
}

}
Expand Down
4 changes: 2 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class IndexDb(
val attrValues = rawEntity.entries.map {
val attr = schema[it.key]
require(attr != null) { "There is no attribute with name ${it.key}" }
require(attr.list || it.value.size == 1) { "Corrupted ${attr.name} of $gid - it is scalar, but multiple values has been found: ${it.value}" }
require(attr.list || DataType.ofCode(attr.type)!!.isRegister() || it.value.size == 1) { "Corrupted ${attr.name} of $gid - it is scalar, but multiple values has been found: ${it.value}" }
val value =
if (attr.list) it.value.map { e -> fixNumberType(attr, e) }
if (attr.list || DataType.ofCode(attr.type)!!.isRegister()) it.value.map { e -> fixNumberType(attr, e) }
else fixNumberType(attr, it.value[0])
attr to value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package qbit.resolving

import kotlinx.coroutines.flow.toList
import qbit.api.Instances
import qbit.api.QBitException
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.api.model.*
import qbit.index.RawEntity
import qbit.serialization.*
import qbit.trx.TrxLog
Expand Down Expand Up @@ -72,6 +71,59 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr<Any>?): (Li
}
}

internal fun crdtResolve(
resolveEntity: (Gid) -> StoredEntity?,
resolveAttrName: (String) -> Attr<Any>?
): (List<PersistedEav>, List<PersistedEav>) -> List<Eav> = { eavsFromA, eavsFromB ->
require(eavsFromA.isNotEmpty()) { "eavsFromA should be not empty" }
require(eavsFromB.isNotEmpty()) { "eavsFromB should be not empty" }

val gid = eavsFromA[0].eav.gid
val attr = resolveAttrName(eavsFromA[0].eav.attr)
?: throw IllegalArgumentException("Cannot resolve ${eavsFromA[0].eav.attr}")

when {
// temporary dirty hack until crdt counter or custom resolution strategy support is implemented
attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav)
attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct()
DataType.ofCode(attr.type)!!.isCounter() -> {
val latestFromA = eavsFromA.maxByOrNull { it.timestamp }!!.eav.value
val latestFromB = eavsFromB.maxByOrNull { it.timestamp }!!.eav.value
val previous = resolveEntity(gid)?.tryGet(attr)

listOf(
if (previous != null)
Eav(
eavsFromA[0].eav.gid,
eavsFromA[0].eav.attr,
if (previous is Byte && latestFromA is Byte && latestFromB is Byte) latestFromA + latestFromB - previous
else if (previous is Int && latestFromA is Int && latestFromB is Int) latestFromA + latestFromB - previous
else if (previous is Long && latestFromA is Long && latestFromB is Long) latestFromA + latestFromB - previous
else throw QBitException("Unexpected counter value type for eav with gid=$gid, attr=$attr")
)
else
Eav(
eavsFromA[0].eav.gid,
eavsFromA[0].eav.attr,
if (latestFromA is Byte && latestFromB is Byte) latestFromA + latestFromB
else if (latestFromA is Int && latestFromB is Int) latestFromA + latestFromB
else if (latestFromA is Long && latestFromB is Long) latestFromA + latestFromB
else throw QBitException("Unexpected counter value type for eav with gid=$gid, attr=$attr")
)
)
}
DataType.ofCode(attr.type)!!.isRegister() -> {
val latestFromA =
eavsFromA.maxOf { it.timestamp }.let { timestamp -> eavsFromA.filter { it.timestamp == timestamp } }
val latestFromB =
eavsFromB.maxOf { it.timestamp }.let { timestamp -> eavsFromB.filter { it.timestamp == timestamp } }

latestFromA.map { it.eav } + latestFromB.map { it.eav }
}
else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav)
}
}

internal fun findBaseNode(node1: Node<Hash>, node2: Node<Hash>, nodesDepth: Map<Hash, Int>): Node<Hash> {
return when {
node1 == node2 -> node1
Expand Down
Loading