From 31eb7b7dca06699425155913096c988c29ddc736 Mon Sep 17 00:00:00 2001 From: Grigory Date: Fri, 25 Mar 2022 12:34:54 +0700 Subject: [PATCH 1/5] Implement the op-based counter prototype --- .../kotlin/qbit/api/model/DataTypes.kt | 36 +++++++++----- .../commonMain/kotlin/qbit/index/IndexDb.kt | 5 +- .../commonMain/kotlin/qbit/index/Indexer.kt | 2 +- .../qbit/resolving/ConflictResolving.kt | 15 +++++- .../kotlin/qbit/schema/SchemaDsl.kt | 15 ++++-- .../kotlin/qbit/serialization/Simple.kt | 2 +- .../kotlin/qbit/trx/Operationalization.kt | 49 +++++++++++++++++++ .../src/commonMain/kotlin/qbit/trx/Trx.kt | 5 +- .../src/commonTest/kotlin/qbit/FunTest.kt | 24 ++++++++- .../kotlin/qbit/OperationalizationTest.kt | 42 ++++++++++++++++ .../src/commonTest/kotlin/qbit/TestSchema.kt | 3 ++ .../src/commonTest/kotlin/qbit/TrxTest.kt | 22 +++++++-- .../kotlin/qbit/test/model/TestModels.kt | 4 ++ 13 files changed, 199 insertions(+), 25 deletions(-) create mode 100644 qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt create mode 100644 qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt diff --git a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt index ce35fdea..3c98d71a 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt @@ -31,14 +31,14 @@ sealed class DataType { private val values: Array> get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef) - fun ofCode(code: Byte): DataType<*>? = - if (code <= 19) { - values.firstOrNull { it.code == code } - } else { - values.map { it.list() }.firstOrNull { it.code == code } - } - - fun ofValue(value: T?): DataType? = when (value) { + fun ofCode(code: Byte): DataType<*>? = when(code) { + in 0..31 -> values.firstOrNull { it.code == code } + in 32..63 -> values.map { it.list() }.firstOrNull { it.code == code } + in 63..95 -> ofCode((code - 64).toByte())?.counter() + else -> null + } + + fun ofValue(value: T?): DataType = when (value) { is Boolean -> QBoolean as DataType is Byte -> QByte as DataType is Int -> QInt as DataType @@ -46,7 +46,7 @@ sealed class DataType { is String -> QString as DataType is ByteArray -> QBytes as DataType is Gid -> QGid as DataType - is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType + is List<*> -> value.firstOrNull()?.let { ofValue(it).list() } as DataType else -> QRef as DataType } } @@ -57,7 +57,14 @@ sealed class DataType { return QList(this) } - fun isList(): Boolean = (code.toInt().and(32)) > 0 + fun isList(): Boolean = code in 32..63 + + fun counter(): QCounter { + require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" } + return QCounter(this) + } + + fun isCounter(): Boolean = code in 64..95 fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef @@ -73,6 +80,7 @@ sealed class DataType { is QBytes -> ByteArray::class is QGid -> Gid::class is QList<*> -> this.itemsType.typeClass() + is QCounter<*> -> this.primitiveType.typeClass() QRef -> Any::class } } @@ -85,6 +93,12 @@ data class QList(val itemsType: DataType) : DataType>() } +data class QCounter(val primitiveType: DataType) : DataType() { + + override val code = (64 + primitiveType.code).toByte() + +} + object QBoolean : DataType() { override val code = 0.toByte() @@ -134,4 +148,4 @@ object QGid : DataType() { } fun isListOfVals(list: List?) = - list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true \ No newline at end of file + list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it).value() } ?: true \ No newline at end of file diff --git a/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt b/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt index 96e79231..9a17dbc8 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt @@ -14,6 +14,7 @@ import qbit.api.gid.Gid import qbit.api.model.* import qbit.api.model.impl.AttachedEntity import qbit.collections.LimitedPersistentMap +import qbit.trx.deoperationalize import qbit.typing.typify import kotlin.reflect.KClass @@ -30,8 +31,8 @@ class IndexDb( private val dataClassesCache = atomic>(LimitedPersistentMap(1024)) - override fun with(facts: Iterable): InternalDb { - return IndexDb(index.addFacts(facts), serialModule) + override fun with(facts: Iterable): IndexDb { + return IndexDb(index.addFacts(deoperationalize(this, facts.toList())), serialModule) } override fun pullEntity(gid: Gid): StoredEntity? { diff --git a/qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt b/qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt index 280e9033..7096fa9f 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt @@ -17,7 +17,7 @@ class Indexer( .toList() .map { it.entities() } .fold(base ?: IndexDb(Index(), serialModule)) { db, n -> - IndexDb(db.index.add(n), serialModule) + db.with(n.flatMap { it.second }) } } diff --git a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt index 4d3bdbc8..4e889a1e 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.toList import qbit.api.Instances import qbit.api.gid.Gid import qbit.api.model.Attr +import qbit.api.model.DataType import qbit.api.model.Eav import qbit.api.model.Hash import qbit.index.RawEntity @@ -41,7 +42,9 @@ data class LogsDiff( resolve(writesFromA[it]!!, writesFromB[it]!!) } } - return resolvingEavsByGid.values.map { RawEntity(it.first().gid, it) } + return resolvingEavsByGid + .filter { !it.value.isEmpty() } + .values.map { RawEntity(it.first().gid, it) } } fun logAEntities(): List { @@ -52,6 +55,15 @@ data class LogsDiff( } } + // This snippet is probably useless and should be wiped out + fun logBOperations(resolveAttrName: (String) -> Attr?): List { + return writesFromB.entries + .filter { DataType.ofCode(resolveAttrName(it.key.attr)!!.type)!!.isCounter() } + .flatMap { operationFromB -> + operationFromB.value.map { RawEntity(operationFromB.key.gid, listOf(it.eav)) } + } + } + private fun List.lastByTimestamp() = maxByOrNull { it.timestamp } @@ -68,6 +80,7 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr?): (Li // temporary dirty hack until crdt counter or custom resolution strategy support is implemented attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav) attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct() + DataType.ofCode(attr.type)!!.isCounter() -> ArrayList() else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav) } } diff --git a/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt b/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt index 384abb6e..de03fd47 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt @@ -24,7 +24,7 @@ class SchemaBuilder(private val serialModule: SerializersModule) { ?: throw QBitException("Cannot find descriptor for $type") val eb = EntityBuilder(descr) eb.body() - attrs.addAll(schemaFor(descr, eb.uniqueProps)) + attrs.addAll(schemaFor(descr, eb.uniqueProps, eb.counters)) } } @@ -33,6 +33,8 @@ class EntityBuilder(private val descr: SerialDescriptor) { internal val uniqueProps = HashSet() + internal val counters = HashSet() + fun uniqueInt(prop: KProperty1) { uniqueAttr(prop) } @@ -48,15 +50,22 @@ class EntityBuilder(private val descr: SerialDescriptor) { uniqueProps.add(AttrName(descr, idx).asString()) } + fun counter(prop: KProperty1) { + val (idx, _) = descr.elementNames + .withIndex().firstOrNull { (_, name) -> name == prop.name } + ?: throw QBitException("Cannot find attr for ${prop.name} in $descr") + counters.add(AttrName(descr, idx).asString()) + } + } -fun schemaFor(rootDesc: SerialDescriptor, unique: Set = emptySet()): List> { +fun schemaFor(rootDesc: SerialDescriptor, unique: Set = emptySet(), counters: Set = emptySet()): List> { return rootDesc.elementDescriptors .withIndex() .filter { rootDesc.getElementName(it.index) !in setOf("id", "gid") } .map { (idx, desc) -> - val dataType = DataType.of(desc) val attr = AttrName(rootDesc, idx).asString() + val dataType = if (attr in counters) DataType.of(desc).counter() else DataType.of(desc) Attr(null, attr, dataType.code, attr in unique, dataType.isList()) } } diff --git a/qbit-core/src/commonMain/kotlin/qbit/serialization/Simple.kt b/qbit-core/src/commonMain/kotlin/qbit/serialization/Simple.kt index f198ee6f..479daf90 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/serialization/Simple.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/serialization/Simple.kt @@ -176,7 +176,7 @@ internal fun deserialize(ins: Input): Any { private fun readMark(ins: Input, expectedMark: DataType): Any { return when (expectedMark) { QBoolean -> (ins.readByte() == 1.toByte()) as T - QByte, QInt, QLong -> readLong(ins) as T + QByte, QInt, QLong, is QCounter<*> -> readLong(ins) as T QBytes -> readLong(ins).let { count -> readBytes(ins, count.toInt()) as T diff --git a/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt b/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt new file mode 100644 index 00000000..5c2fe948 --- /dev/null +++ b/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt @@ -0,0 +1,49 @@ +package qbit.trx + +import qbit.api.QBitException +import qbit.api.model.DataType +import qbit.api.model.Eav +import qbit.index.InternalDb + +fun operationalize(db: InternalDb, facts: List): List { + return facts.map { operationalizeCounter(db, it) } +} + +private fun operationalizeCounter(db: InternalDb, fact: Eav): Eav { + val attr = db.attr(fact.attr)!! + val dataType = DataType.ofCode(attr.type)!! + return if (dataType.isCounter()) { + val previous = db.pullEntity(fact.gid)?.tryGet(attr) + if (previous != null) { + Eav( + fact.gid, + fact.attr, + if (previous is Byte && fact.value is Byte) fact.value - previous + else if (previous is Int && fact.value is Int) fact.value - previous + else if (previous is Long && fact.value is Long) fact.value - previous + else throw QBitException("Unexpected counter value type for $fact") + ) + } else fact + } else fact +} + +fun deoperationalize(db: InternalDb, facts: List): List { + return facts.map { deoperationalizeCounter(db, it) } +} + +private fun deoperationalizeCounter(db: InternalDb, fact: Eav): Eav { + val attr = db.attr(fact.attr) + return if (attr != null && DataType.ofCode(attr.type)!!.isCounter()) { + val previous = db.pullEntity(fact.gid)?.tryGet(attr) + if (previous != null) { + Eav( + fact.gid, + fact.attr, + if (previous is Byte && fact.value is Byte) previous + fact.value + else if (previous is Int && fact.value is Int) previous + fact.value + else if (previous is Long && fact.value is Long) previous + fact.value + else throw QBitException("Unexpected counter value type for $fact") + ) + } else fact + } else fact +} \ No newline at end of file diff --git a/qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt b/qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt index f598b549..2c4e9329 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt @@ -44,8 +44,9 @@ internal class QTrx( return QbitWriteResult(entityGraphRoot, curDb) } validate(curDb, updatedFacts) - factsBuffer.addAll(updatedFacts) - curDb = curDb.with(updatedFacts) + val operationalizedFacts = operationalize(curDb, updatedFacts) + factsBuffer.addAll(operationalizedFacts) + curDb = curDb.with(operationalizedFacts) val res = if (facts.entityFacts[entityGraphRoot]!!.firstOrNull()?.gid in entities) { entityGraphRoot diff --git a/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt b/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt index c53945e2..fcd1e495 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt @@ -400,7 +400,7 @@ class FunTest { assertEquals(bomb.country, storedBomb.country) assertEquals(bomb.optCountry, storedBomb.optCountry) assertEquals( - listOf(Country(12884901889, "Country1", 0), Country(4294967383, "Country3", 2)), + listOf(Country(12884901889, "Country1", 0), Country(4294967384, "Country3", 2)), storedBomb.countiesList ) // todo: assertEquals(bomb.countriesListOpt, storedBomb.countriesListOpt) @@ -574,4 +574,26 @@ class FunTest { assertEquals(Gid(nsk.id!!), trx2EntityAttrValues.first { it.attr.name == "City/region" }.value) } } + + @JsName("Test_counter_resolving") + @Test + fun `Test counter resolving`() { + runBlocking { + val conn = setupTestSchema() + val counter = IntCounterEntity(1, 10) + val trx = conn.trx() + trx.persist(counter) + trx.commit() + + val trx1 = conn.trx() + val trx2 = conn.trx() + trx1.persist(counter.copy(counter = 40)) + delay(100) + trx2.persist(counter.copy(counter = 70)) + trx1.commit() + trx2.commit() + + assertEquals(conn.db().pull(1)?.counter, 100) + } + } } \ No newline at end of file diff --git a/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt b/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt new file mode 100644 index 00000000..6aece635 --- /dev/null +++ b/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt @@ -0,0 +1,42 @@ +package qbit + +import qbit.api.gid.nextGids +import qbit.factoring.serializatoin.KSFactorizer +import qbit.test.model.IntCounterEntity +import qbit.trx.operationalize +import qbit.typing.qbitCoreTestsSerialModule +import kotlin.js.JsName +import kotlin.test.Test +import kotlin.test.assertEquals + +class OperationalizationTest { + + private val gids = qbit.api.gid.Gid(0, 0).nextGids() + + val factor = KSFactorizer(qbitCoreTestsSerialModule)::factor + + val emptyDb = dbOf(gids, *(bootstrapSchema.values + testSchema).toTypedArray()) + + @JsName("Counter_not_persisted_in_db_should_pass_as_is") + @Test + fun `Counter not persisted in db should pass as-is`() { + val counterEntity = IntCounterEntity(null, 10) + val facts = operationalize(emptyDb, factor(counterEntity, emptyDb::attr, gids).entityFacts.values.first()) + assertEquals(1, facts.size, "Factoring of single entity with single attr should produce single fact") + assertEquals("IntCounterEntity/counter", facts[0].attr) + assertEquals(10, facts[0].value) + } + + @JsName("Persisted_counter_should_turn_into_difference") + @Test + fun `Persisted counter should turn into difference`() { + val counterEntity = IntCounterEntity(1, 10) + val updatedDb = emptyDb.with(factor(counterEntity, emptyDb::attr, gids)) + + counterEntity.counter = 100 + val facts = operationalize(updatedDb, factor(counterEntity, updatedDb::attr, gids).entityFacts.values.first()) + assertEquals(1, facts.size, "Factoring of single entity with single attr should produce single fact") + assertEquals("IntCounterEntity/counter", facts[0].attr) + assertEquals(90, facts[0].value) + } +} \ No newline at end of file diff --git a/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt b/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt index fc7d77a9..a880b44a 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt @@ -36,6 +36,9 @@ val testSchema = schema(internalTestsSerialModule) { entity(NullableList::class) entity(NullableRef::class) entity(IntEntity::class) + entity(IntCounterEntity::class) { + counter(IntCounterEntity::counter) + } entity(ResearchGroup::class) entity(EntityWithByteArray::class) entity(EntityWithListOfBytes::class) diff --git a/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt b/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt index 840ccd9e..c470a022 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt @@ -6,18 +6,16 @@ import qbit.api.Attrs import qbit.api.Instances import qbit.api.QBitException import qbit.api.db.Conn -import qbit.api.db.attrIs import qbit.api.db.pull -import qbit.api.db.query import qbit.api.gid.Gid import qbit.api.gid.nextGids -import qbit.api.model.Attr import qbit.api.system.Instance import qbit.ns.Key import qbit.ns.ns import qbit.platform.runBlocking import qbit.spi.Storage import qbit.storage.MemStorage +import qbit.test.model.IntCounterEntity import qbit.test.model.Region import qbit.test.model.Scientist import qbit.test.model.testsSerialModule @@ -176,6 +174,24 @@ class TrxTest { } } + @JsName("Counter_test") + @Test + fun `Counter test`() { + runBlocking { + val conn = setupTestData() + val counterEntity = IntCounterEntity(1, 10) + val trx1 = conn.trx() + trx1.persist(counterEntity) + trx1.commit() + assertEquals(conn.db().pull(1)?.counter, counterEntity.counter) + counterEntity.counter = 90 + val trx2 = conn.trx() + trx2.persist(counterEntity) + trx2.commit() + assertEquals(conn.db().pull(1)?.counter, counterEntity.counter) + } + } + private suspend fun openEmptyConn(): Pair { val storage = MemStorage() val conn = qbit(storage, testsSerialModule) diff --git a/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt b/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt index d88500fc..a631f920 100644 --- a/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt +++ b/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt @@ -9,6 +9,9 @@ data class TheSimplestEntity(val id: Long?, val scalar: String) @Serializable data class IntEntity(val id: Long?, val int: Int) +@Serializable +data class IntCounterEntity(val id: Long?, var counter: Int) + @Serializable data class NullableIntEntity(val id: Long?, val int: Int?) @@ -307,6 +310,7 @@ val testsSerialModule = SerializersModule { contextual(ByteArrayEntity::class, ByteArrayEntity.serializer()) contextual(ListOfByteArraysEntity::class, ListOfByteArraysEntity.serializer()) contextual(IntEntity::class, IntEntity.serializer()) + contextual(IntCounterEntity::class, IntCounterEntity.serializer()) contextual(Region::class, Region.serializer()) contextual(ParentToChildrenTreeEntity::class, ParentToChildrenTreeEntity.serializer()) contextual(EntityWithRefsToSameType::class, EntityWithRefsToSameType.serializer()) From 9312847f30bd8cdd746f64b850b3f2a7e8414ac8 Mon Sep 17 00:00:00 2001 From: Grigory Date: Sun, 1 May 2022 14:05:46 +0700 Subject: [PATCH 2/5] some feedback improvements --- .../kotlin/qbit/api/model/DataTypes.kt | 20 +++++++++------ .../kotlin/qbit/schema/SchemaDsl.kt | 25 ++++++++++++++----- .../src/commonTest/kotlin/qbit/FunTest.kt | 8 +++--- .../kotlin/qbit/OperationalizationTest.kt | 5 ++-- .../src/commonTest/kotlin/qbit/TestSchema.kt | 2 +- .../src/commonTest/kotlin/qbit/TrxTest.kt | 21 ++++++++-------- .../kotlin/qbit/test/model/TestModels.kt | 2 +- 7 files changed, 50 insertions(+), 33 deletions(-) diff --git a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt index 3c98d71a..c89b086a 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt @@ -21,6 +21,10 @@ import kotlin.reflect.KClass * - List */ +val scalarRange = 0..31 +val listRange = 32..63 +val counterRange = 64..95 + @Suppress("UNCHECKED_CAST") sealed class DataType { @@ -32,13 +36,13 @@ sealed class DataType { get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef) fun ofCode(code: Byte): DataType<*>? = when(code) { - in 0..31 -> values.firstOrNull { it.code == code } - in 32..63 -> values.map { it.list() }.firstOrNull { it.code == code } - in 63..95 -> ofCode((code - 64).toByte())?.counter() + in scalarRange -> values.firstOrNull { it.code == code } + in listRange -> values.map { it.list() }.firstOrNull { it.code == code } + in counterRange -> ofCode((code - 64).toByte())?.counter() else -> null } - fun ofValue(value: T?): DataType = when (value) { + fun ofValue(value: T?): DataType? = when (value) { is Boolean -> QBoolean as DataType is Byte -> QByte as DataType is Int -> QInt as DataType @@ -46,7 +50,7 @@ sealed class DataType { is String -> QString as DataType is ByteArray -> QBytes as DataType is Gid -> QGid as DataType - is List<*> -> value.firstOrNull()?.let { ofValue(it).list() } as DataType + is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType? else -> QRef as DataType } } @@ -57,14 +61,14 @@ sealed class DataType { return QList(this) } - fun isList(): Boolean = code in 32..63 + fun isList(): Boolean = code in listRange fun counter(): QCounter { require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" } return QCounter(this) } - fun isCounter(): Boolean = code in 64..95 + fun isCounter(): Boolean = code in counterRange fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef @@ -148,4 +152,4 @@ object QGid : DataType() { } fun isListOfVals(list: List?) = - list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it).value() } ?: true \ No newline at end of file + list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true \ No newline at end of file diff --git a/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt b/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt index de03fd47..930f696e 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt @@ -44,17 +44,30 @@ class EntityBuilder(private val descr: SerialDescriptor) { } private fun uniqueAttr(prop: KProperty1) { - val (idx, _) = descr.elementNames - .withIndex().firstOrNull { (_, name) -> name == prop.name } - ?: throw QBitException("Cannot find attr for ${prop.name} in $descr") - uniqueProps.add(AttrName(descr, idx).asString()) + uniqueProps.add(getAttrName(prop)) + } + + fun byteCounter(prop: KProperty1) { + counter(prop) + } + + fun intCounter(prop: KProperty1) { + counter(prop) + } + + fun longCounter(prop: KProperty1) { + counter(prop) + } + + private fun counter(prop: KProperty1) { + counters.add(getAttrName(prop)) } - fun counter(prop: KProperty1) { + private fun getAttrName(prop: KProperty1): String { val (idx, _) = descr.elementNames .withIndex().firstOrNull { (_, name) -> name == prop.name } ?: throw QBitException("Cannot find attr for ${prop.name} in $descr") - counters.add(AttrName(descr, idx).asString()) + return AttrName(descr, idx).asString() } } diff --git a/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt b/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt index fcd1e495..1959fa93 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/FunTest.kt @@ -459,9 +459,9 @@ class FunTest { trx1.persist(eBrewer.copy(name = "Im different change")) val trx2 = conn.trx() trx2.persist(eCodd.copy(name = "Im change 2")) - delay(100) trx2.persist(pChen.copy(name = "Im different change")) trx1.commit() + delay(1) trx2.commit() conn.db { assertEquals("Im change 2", it.pull(eCodd.id!!)!!.name) @@ -540,6 +540,7 @@ class FunTest { ) ) trx1.commit() + delay(1) trx2.commit() conn.db { assertEquals("Im change 2", it.pull(eCodd.id!!)!!.name) @@ -575,9 +576,9 @@ class FunTest { } } - @JsName("Test_counter_resolving") + @JsName("qbit_should_accumulate_concurrent_increments_of_counter") @Test - fun `Test counter resolving`() { + fun `qbit should accumulate concurrent increments of counter`() { runBlocking { val conn = setupTestSchema() val counter = IntCounterEntity(1, 10) @@ -588,7 +589,6 @@ class FunTest { val trx1 = conn.trx() val trx2 = conn.trx() trx1.persist(counter.copy(counter = 40)) - delay(100) trx2.persist(counter.copy(counter = 70)) trx1.commit() trx2.commit() diff --git a/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt b/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt index 6aece635..67e6f470 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt @@ -31,10 +31,9 @@ class OperationalizationTest { @Test fun `Persisted counter should turn into difference`() { val counterEntity = IntCounterEntity(1, 10) - val updatedDb = emptyDb.with(factor(counterEntity, emptyDb::attr, gids)) + val updatedDb = emptyDb.with(factor(IntCounterEntity(1, 10), emptyDb::attr, gids)) - counterEntity.counter = 100 - val facts = operationalize(updatedDb, factor(counterEntity, updatedDb::attr, gids).entityFacts.values.first()) + val facts = operationalize(updatedDb, factor(counterEntity.copy(counter = 100), updatedDb::attr, gids).entityFacts.values.first()) assertEquals(1, facts.size, "Factoring of single entity with single attr should produce single fact") assertEquals("IntCounterEntity/counter", facts[0].attr) assertEquals(90, facts[0].value) diff --git a/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt b/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt index a880b44a..f274bc02 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt @@ -37,7 +37,7 @@ val testSchema = schema(internalTestsSerialModule) { entity(NullableRef::class) entity(IntEntity::class) entity(IntCounterEntity::class) { - counter(IntCounterEntity::counter) + intCounter(IntCounterEntity::counter) } entity(ResearchGroup::class) entity(EntityWithByteArray::class) diff --git a/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt b/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt index c470a022..92d65d54 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt @@ -176,19 +176,20 @@ class TrxTest { @JsName("Counter_test") @Test - fun `Counter test`() { + fun `Counter test`() { // TODO: find an appropriate place for this test runBlocking { val conn = setupTestData() val counterEntity = IntCounterEntity(1, 10) - val trx1 = conn.trx() - trx1.persist(counterEntity) - trx1.commit() - assertEquals(conn.db().pull(1)?.counter, counterEntity.counter) - counterEntity.counter = 90 - val trx2 = conn.trx() - trx2.persist(counterEntity) - trx2.commit() - assertEquals(conn.db().pull(1)?.counter, counterEntity.counter) + + conn.trx { + persist(counterEntity) + } + assertEquals(conn.db().pull(1)?.counter, 10) + + conn.trx { + persist(counterEntity.copy(counter = 90)) + } + assertEquals(conn.db().pull(1)?.counter, 90) } } diff --git a/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt b/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt index a631f920..79760478 100644 --- a/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt +++ b/qbit-test-fixtures/src/commonMain/kotlin/qbit/test/model/TestModels.kt @@ -10,7 +10,7 @@ data class TheSimplestEntity(val id: Long?, val scalar: String) data class IntEntity(val id: Long?, val int: Int) @Serializable -data class IntCounterEntity(val id: Long?, var counter: Int) +data class IntCounterEntity(val id: Long?, val counter: Int) @Serializable data class NullableIntEntity(val id: Long?, val int: Int?) From 158abb000dc1a4b9011c47ecdf80b6ee8a8e1735 Mon Sep 17 00:00:00 2001 From: Grigory Date: Sun, 1 May 2022 20:02:54 +0700 Subject: [PATCH 3/5] some feedback improvements --- .../src/commonMain/kotlin/qbit/api/model/DataTypes.kt | 8 ++++---- .../kotlin/qbit/resolving/ConflictResolving.kt | 9 --------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt index c89b086a..6aac5f00 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt @@ -37,8 +37,8 @@ sealed class DataType { fun ofCode(code: Byte): DataType<*>? = when(code) { in scalarRange -> values.firstOrNull { it.code == code } - in listRange -> values.map { it.list() }.firstOrNull { it.code == code } - in counterRange -> ofCode((code - 64).toByte())?.counter() + in listRange -> ofCode((code - listRange.first).toByte())?.list() + in counterRange -> ofCode((code - counterRange.first).toByte())?.counter() else -> null } @@ -93,13 +93,13 @@ sealed class DataType { data class QList(val itemsType: DataType) : DataType>() { - override val code = (32 + itemsType.code).toByte() + override val code = (listRange.first + itemsType.code).toByte() } data class QCounter(val primitiveType: DataType) : DataType() { - override val code = (64 + primitiveType.code).toByte() + override val code = (counterRange.first + primitiveType.code).toByte() } diff --git a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt index 4e889a1e..b0350e88 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt @@ -55,15 +55,6 @@ data class LogsDiff( } } - // This snippet is probably useless and should be wiped out - fun logBOperations(resolveAttrName: (String) -> Attr?): List { - return writesFromB.entries - .filter { DataType.ofCode(resolveAttrName(it.key.attr)!!.type)!!.isCounter() } - .flatMap { operationFromB -> - operationFromB.value.map { RawEntity(operationFromB.key.gid, listOf(it.eav)) } - } - } - private fun List.lastByTimestamp() = maxByOrNull { it.timestamp } From 5277797625433b09b39a5ac47a0ac5dad470ebbc Mon Sep 17 00:00:00 2001 From: Grigory Date: Sun, 1 May 2022 22:25:41 +0700 Subject: [PATCH 4/5] change nextEid type to counter, fix crdt loses from index --- .../kotlin/qbit/api/QbitSelfSchema.kt | 2 +- .../src/commonMain/kotlin/qbit/index/Index.kt | 43 +++++++++++++------ .../commonMain/kotlin/qbit/index/IndexDb.kt | 2 +- .../kotlin/qbit/trx/Operationalization.kt | 6 +-- .../commonTest/kotlin/qbit/TestUtilsCore.kt | 2 +- .../kotlin/qbit/typing/MappingTest.kt | 8 ++-- 6 files changed, 40 insertions(+), 23 deletions(-) diff --git a/qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt b/qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt index db1b1203..f8962390 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt @@ -51,7 +51,7 @@ object Instances { val nextEid = Attr( Gid(1, 5), "Instance/nextEid", - QInt.code, + QInt.counter().code, unique = false, list = false ) diff --git a/qbit-core/src/commonMain/kotlin/qbit/index/Index.kt b/qbit-core/src/commonMain/kotlin/qbit/index/Index.kt index ea9a5ea0..7ab74aba 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/index/Index.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/index/Index.kt @@ -2,6 +2,8 @@ package qbit.index import qbit.api.db.QueryPred import qbit.api.gid.Gid +import qbit.api.model.Attr +import qbit.api.model.DataType import qbit.api.model.Eav import qbit.api.tombstone import qbit.platform.assert @@ -61,17 +63,17 @@ class Index( } } - fun addFacts(facts: List): Index = - addFacts(facts as Iterable) + fun addFacts(facts: List, resolveAttr: (String) -> Attr<*>? = { null }): Index = + addFacts(facts as Iterable, resolveAttr) - fun addFacts(facts: Iterable): Index { + fun addFacts(facts: Iterable, resolveAttr: (String) -> Attr<*>? = { null }): Index { val entities = facts .groupBy { it.gid } .map { it.key to it.value } - return add(entities) + return add(entities, resolveAttr) } - fun add(entities: List): Index { + fun add(entities: List, resolveAttr: (String) -> Attr<*>? = { null }): Index { val newEntities = HashMap(this.entities) // eavs of removed or updated entities @@ -82,12 +84,27 @@ class Index( val (gid, eavs) = e val isUpdate = eavs[0].attr != tombstone.name - val obsoleteEntity = - if (isUpdate) { - newEntities.put(gid, e) - } else { - newEntities.remove(gid) - } + val obsoleteEntity = newEntities.get(gid) + + if (isUpdate) { + val crdts = obsoleteEntity?.second + ?.filter { + val attr = resolveAttr(it.attr) + if (attr == null) { + false + } else { + DataType.ofCode(attr.type)!!.isCounter() + } + } + ?.filter { + crdtEav -> eavs.none { it.attr == crdtEav.attr } + } + ?: emptyList() + obsoleteEavs.removeAll(crdts) + newEntities.put(gid, RawEntity(gid, eavs + crdts)) + } else { + newEntities.remove(gid) + } if (obsoleteEntity != null) { obsoleteEavs.addAll(obsoleteEntity.second) @@ -103,8 +120,8 @@ class Index( return Index(newEntities, newAveIndex) } - fun add(e: RawEntity): Index { - return add(listOf(e)) + fun add(e: RawEntity, resolveAttr: (String) -> Attr<*>? = { null }): Index { + return add(listOf(e), resolveAttr) } fun entityById(eid: Gid): Map>? = diff --git a/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt b/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt index 9a17dbc8..773343f7 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt @@ -32,7 +32,7 @@ class IndexDb( private val dataClassesCache = atomic>(LimitedPersistentMap(1024)) override fun with(facts: Iterable): IndexDb { - return IndexDb(index.addFacts(deoperationalize(this, facts.toList())), serialModule) + return IndexDb(index.addFacts(deoperationalize(this, facts.toList()), this::attr), serialModule) } override fun pullEntity(gid: Gid): StoredEntity? { diff --git a/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt b/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt index 5c2fe948..6d7554f3 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt @@ -39,9 +39,9 @@ private fun deoperationalizeCounter(db: InternalDb, fact: Eav): Eav { Eav( fact.gid, fact.attr, - if (previous is Byte && fact.value is Byte) previous + fact.value - else if (previous is Int && fact.value is Int) previous + fact.value - else if (previous is Long && fact.value is Long) previous + fact.value + if (fact.value is Byte) (previous as Number).toByte() + fact.value + else if (fact.value is Int) (previous as Number).toInt() + fact.value + else if (fact.value is Long) (previous as Number).toLong() + fact.value else throw QBitException("Unexpected counter value type for $fact") ) } else fact diff --git a/qbit-core/src/commonTest/kotlin/qbit/TestUtilsCore.kt b/qbit-core/src/commonTest/kotlin/qbit/TestUtilsCore.kt index 7df25f1b..feb7004e 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/TestUtilsCore.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/TestUtilsCore.kt @@ -55,7 +55,7 @@ internal object EmptyDb : InternalDb() { override fun attr(attr: String): Attr? = bootstrapSchema[attr] override fun with(facts: Iterable): InternalDb { - return IndexDb(Index().addFacts(facts), testsSerialModule) + return IndexDb(Index().addFacts(facts, this::attr), testsSerialModule) } } diff --git a/qbit-core/src/commonTest/kotlin/qbit/typing/MappingTest.kt b/qbit-core/src/commonTest/kotlin/qbit/typing/MappingTest.kt index 9c59adf6..3dfe7d37 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/typing/MappingTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/typing/MappingTest.kt @@ -41,7 +41,7 @@ class MappingTest { val db = createTestDb() val facts = factor(user, db::attr, gids) - val db2 = IndexDb(db.index.addFacts(facts), testsSerialModule) + val db2 = IndexDb(db.index.addFacts(facts, db::attr), testsSerialModule) val se = db2.pullEntity(facts.entityFacts[user]!!.first().gid)!! val fullUser = typify(db::attr, se, MUser::class, testsSerialModule) @@ -65,7 +65,7 @@ class MappingTest { ) val db = createTestDb() val facts = factor(user, db::attr, gids) - val db2 = IndexDb(db.index.addFacts(facts), testsSerialModule) + val db2 = IndexDb(db.index.addFacts(facts, db::attr), testsSerialModule) val se = db2.pullEntity(facts.entityFacts[user]!!.first().gid)!! val fullUser = typify(db::attr, se, MUser::class, testsSerialModule) @@ -331,7 +331,7 @@ class MappingTest { // When it's factorized, stored, pulled and typed val testDb = createTestDb() val facts = factor(bomb, testDb::attr, gids) - val db2 = IndexDb(testDb.index.addFacts(facts), testsSerialModule) + val db2 = IndexDb(testDb.index.addFacts(facts, testDb::attr), testsSerialModule) val se = db2.pullEntity(facts.entityFacts[bomb]!!.first().gid)!! val typedBomb = typify(testDb::attr, se, Bomb::class, testsSerialModule) @@ -356,7 +356,7 @@ class MappingTest { // When it's factorized, stored, pulled and typed val testDb = createTestDb() val facts = factor(bomb, testDb::attr, gids) - val db2 = IndexDb(testDb.index.addFacts(facts), testsSerialModule) + val db2 = IndexDb(testDb.index.addFacts(facts, testDb::attr), testsSerialModule) val se = db2.pullEntity(facts.entityFacts[bomb]!!.first().gid)!! var typedBomb = typify(testDb::attr, se, Bomb::class, testsSerialModule) From 8cd57b0b355eb740bee204774f5a9b137ea96e21 Mon Sep 17 00:00:00 2001 From: Grigory Date: Tue, 3 May 2022 19:53:47 +0700 Subject: [PATCH 5/5] finally make nextEid CRDT --- .../qbit/resolving/ConflictResolving.kt | 8 ++++---- .../qbit/resolving/ResolveConflictsTest.kt | 19 ------------------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt index b0350e88..81dbf0a1 100644 --- a/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt +++ b/qbit-core/src/commonMain/kotlin/qbit/resolving/ConflictResolving.kt @@ -1,7 +1,6 @@ package qbit.resolving import kotlinx.coroutines.flow.toList -import qbit.api.Instances import qbit.api.gid.Gid import qbit.api.model.Attr import qbit.api.model.DataType @@ -43,7 +42,10 @@ data class LogsDiff( } } return resolvingEavsByGid - .filter { !it.value.isEmpty() } + .filter { it.value.isNotEmpty() } // CRDT values in eavs are operations. + // Operations should not be created during merge + // So, it is possible to have "empty" entities there + // They should be filtered out .values.map { RawEntity(it.first().gid, it) } } @@ -68,8 +70,6 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr?): (Li ?: throw IllegalArgumentException("Cannot resolve ${eavsFromA[0].eav.attr}") when { - // temporary dirty hack until crdt counter or custom resolution strategy support is implemented - attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav) attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct() DataType.ofCode(attr.type)!!.isCounter() -> ArrayList() else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav) diff --git a/qbit-core/src/commonTest/kotlin/qbit/resolving/ResolveConflictsTest.kt b/qbit-core/src/commonTest/kotlin/qbit/resolving/ResolveConflictsTest.kt index c6a66ed9..6aec372b 100644 --- a/qbit-core/src/commonTest/kotlin/qbit/resolving/ResolveConflictsTest.kt +++ b/qbit-core/src/commonTest/kotlin/qbit/resolving/ResolveConflictsTest.kt @@ -1,11 +1,7 @@ package qbit.resolving import qbit.Attr -import qbit.api.Instances -import qbit.api.gid.Gid -import qbit.api.model.Eav import qbit.api.model.Hash -import qbit.api.model.nullHash import qbit.platform.runBlocking import qbit.serialization.NodeVal import kotlin.js.JsName @@ -67,19 +63,4 @@ class ResolveConflictsTest { assertEquals(eavA.value, result[0].second[0].value) } } - - @JsName("Test_last_writer_wins_resolving_for_nextEid_attribute") - @Test - fun `Test last writer wins resolving for nextEid attribute`(){ - runBlocking { - val resolveConflictForNextEidAttr = lastWriterWinsResolve { Instances.nextEid } - val eav1 = Eav(Gid(1,8), Instances.nextEid.name, 10) - val eav2 = Eav(Gid(1,8), Instances.nextEid.name, 11) - val result = resolveConflictForNextEidAttr( - listOf(PersistedEav(eav1, 11, nullHash)), - listOf(PersistedEav(eav2, 10, nullHash)) - ) - assertEquals(listOf(eav2), result) - } - } } \ No newline at end of file