diff --git a/block-storage/src/main/scala/coop/rchain/blockstorage/dag/BlockDagStorage.scala b/block-storage/src/main/scala/coop/rchain/blockstorage/dag/BlockDagStorage.scala index e2638ef9629..5314b8f964b 100644 --- a/block-storage/src/main/scala/coop/rchain/blockstorage/dag/BlockDagStorage.scala +++ b/block-storage/src/main/scala/coop/rchain/blockstorage/dag/BlockDagStorage.scala @@ -5,13 +5,17 @@ import coop.rchain.blockstorage.dag.BlockDagStorage.DeployId import coop.rchain.casper.protocol.{BlockMessage, DeployData} import coop.rchain.crypto.signatures.Signed import coop.rchain.models.BlockHash.BlockHash -import coop.rchain.models.BlockMetadata +import coop.rchain.models.{BlockMetadata, FringeData} trait BlockDagStorage[F[_]] { def getRepresentation: F[DagRepresentation] - def insert(blockMetadata: BlockMetadata, block: BlockMessage, isSync: Boolean = false): F[Unit] + def insert( + blockMetadata: BlockMetadata, + block: BlockMessage, + isSync: Boolean = false + ): F[Unit] def lookup(blockHash: BlockHash): F[Option[BlockMetadata]] diff --git a/block-storage/src/main/scala/coop/rchain/blockstorage/dag/Finalizer.scala b/block-storage/src/main/scala/coop/rchain/blockstorage/dag/Finalizer.scala index 21073e72655..10a7ca66ca8 100644 --- a/block-storage/src/main/scala/coop/rchain/blockstorage/dag/Finalizer.scala +++ b/block-storage/src/main/scala/coop/rchain/blockstorage/dag/Finalizer.scala @@ -31,7 +31,8 @@ final case class Message[M, S]( parents: Set[M], fringe: Set[M], // Cache of seen message ids - seen: View[S] + seen: View[S], + ejections: Set[S] = Set.empty[S] ) { override def hashCode(): Int = this.id.hashCode() } @@ -151,7 +152,14 @@ final case class Finalizer[M, S](msgMap: Map[M, Message[M, S]]) { def calculateFinalization( justifications: Set[Message[M, S]], bondsMap: Map[S, Long] - ): (Set[Message[M, S]], List[Set[Message[M, S]]]) = { + ): (Set[Message[M, S]], Either[Set[S], List[Set[Message[M, S]]]]) = { + def missingSenders(prevFringe: Set[Message[M, S]]): Set[S] = { + val minMsgs = justifications.toList.flatMap(p => (p +: selfParents(p, prevFringe)).lastOption) + // Include ancestors of minimum messages as next layer + val nextLayer = calculateNextLayer(minMsgs) + bondsMap.keySet -- nextLayer.keySet + } + // Calculate next fringe from previous fringe def nextFringe(prevFringe: Set[Message[M, S]]): Option[Set[Message[M, S]]] = for { @@ -177,10 +185,12 @@ final case class Finalizer[M, S](msgMap: Map[M, Message[M, S]]) { // Latest fringe seen from justifications val parentFringe = msgMap.latestFringe(justifications) + val missing = missingSenders(parentFringe) + // Find top most fringe // - multiple fringes can be finalized at once val newFringes = LazyList.unfold(parentFringe)(nextFringe(_).map(nf => (nf, nf))) - (parentFringe, newFringes.toList) + (parentFringe, missing.isEmpty.guard[Option].as(newFringes.toList).toRight(missing)) } } diff --git a/casper/src/main/resources/Pos.rhox b/casper/src/main/resources/Pos.rhox index 0e2b5336dda..e3e49b2f843 100644 --- a/casper/src/main/resources/Pos.rhox +++ b/casper/src/main/resources/Pos.rhox @@ -505,6 +505,59 @@ in { } } } | + // Private method which transfers all of a validator's rewards + bond to the Coop vault. + // Slash expects (Boolean, Either[Nil, String]) on returnCh. + contract PoS(@"eject", @ejectedValidator, @sysAuthToken, returnCh) = { + new isValidTokenCh, revAddressOps(`rho:rev:address`) in { + sysAuthTokenOps!("check", sysAuthToken, *isValidTokenCh) | + for (@isValid <- isValidTokenCh) { + if (isValid) { + new stateProcessCh in { + runMVar!(*stateCh, *stateProcessCh, *returnCh) | + for (@state, stateUpdateCh <- stateProcessCh) { + new valBondCh, valRewardCh, posAuthKeyCh, revAddrCh in { + // Takes the stake of slashed validator + // - transfer stake of ejected validator from PoS to validators vault + // - update state + valBondCh!(state.get("allBonds").get(ejectedValidator)) | + @RevVault!("unforgeableAuthKey", posVaultUnf, *posAuthKeyCh) | + revAddressOps!("fromPublicKey", ejectedValidator, *revAddrCh) | + for (@valBond <- valBondCh ; @posAuthKey <- posAuthKeyCh; @target <- revAddrCh) { + // Transfers ejected funds from PoS vault to validators vault, then updates state. + new transferDoneCh in { + @posVault!( + "transfer", + target, + valBond, + posAuthKey, + *transferDoneCh + ) | + // FIXME handle transfer failing case + for (_ <- transferDoneCh) { + // Sets slashed validator's committed rewards and bond to 0 in state. + // Moves slashed validator to withdrawers map with no quarantine period. + stateUpdateCh!({ + "allBonds" : state.get("allBonds").set(ejectedValidator, 0), + "activeValidators": state.get("activeValidators").delete(ejectedValidator), + "committedRewards" : state.get("committedRewards").delete(ejectedValidator), + "withdrawers" : state.get("withdrawers"), + "pendingWithdrawers": state.get("pendingWithdrawers"), + "randomImages" : state.get("randomImages"), + "randomNumbers" : state.get("randomNumbers") + }, + (true, Nil)) + } + } + } + } + } + } + } else { + returnCh!((false, "Invalid system auth token")) + } + } + } + } | // Private method which signals the end of block processing. contract PoS(@"closeBlock", @sysAuthToken, ackCh) = { new isValidTokenCh in { diff --git a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala index 0f9ab686135..313c3967dc0 100644 --- a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala +++ b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala @@ -20,6 +20,7 @@ import coop.rchain.models.syntax._ import coop.rchain.models.{BlockHash => _, _} import coop.rchain.rspace.hashing.Blake2b256Hash import coop.rchain.rspace.history.RadixTree +import coop.rchain.sdk.dag.View.IncludeTop import coop.rchain.sdk.error.FatalError import coop.rchain.sdk.syntax.all.mapSyntax import coop.rchain.shared._ @@ -49,9 +50,26 @@ object MultiParentCasper { // Get currently finalized bonds map val prevFringe = dag.dagMessageState.msgMap.latestFringe(lms) val prevFringeHashes = prevFringe.map(_.id) - if (prevFringe.isEmpty) - lms.head.bondsMap.pure[F] - else + + def hl(v: Validator, sN: Long): BlockHash = { + val l = dag.hashLookup((v, sN)) + assert(l.size == 1, "Equivocations are not supported") + l.head + } + val ejections = + dag.dagMessageState.msgMap + .between(parents, prevFringeHashes, hl, IncludeTop) + .flatMap(dag.dagMessageState.msgMap(_).ejections) + + if (prevFringe.isEmpty) { + val bondsMap = lms.head.bondsMap + Log + .log[F] + .info(s"using genesis bonds ${bondsMap.view.map { + case (v, s) => v.toHexString.take(6) -> s + }.toMap}") + .as(bondsMap) + } else for { // Calculate finalized fringe from justifications // Previous fringe state should be present (loaded from BlockMetadata store) @@ -74,7 +92,7 @@ object MultiParentCasper { .info(s"bonds in ${prevFringeStateHash.toBlake2b256Hash} ${bondsMap.view.map { case (v, s) => v.toHexString.take(6) -> s }.toMap}") - } yield bondsMap + } yield bondsMap -- ejections } def getPreStateForNewBlock[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log] @@ -122,90 +140,113 @@ object MultiParentCasper { bondsMap <- bondsMap(dag, parentHashes) finalizer = Finalizer(dag.dagMessageState.msgMap) - (_, newFringesFound) = finalizer + (_, either) = finalizer .calculateFinalization(parents, bondsMap) + newFringesFound = either.getOrElse(List()) + missing = either.swap.getOrElse(Set()) + conflictSet = dag.dagMessageState.msgMap + .between( + parentHashes, + prevFringeHashes, + (v, sN) => dag.hashLookup.getUnsafe(v -> sN).head, + IncludeTop + ) + toEject = if (conflictSet.size > (bondsMap.size ^ 3)) missing else Set.empty[Validator] + _ <- Log[F].info(s"Ejecting ${toEject.map(_.toHexString.take(8))}") + _ <- Log[F] .info(s"Found ${newFringesFound.size} new fringes: ${newFringesFound .map(_.map(_.id.toHexString.take(8)).toList.sorted)}") .whenA(newFringesFound.nonEmpty) // If new fringe is finalized, merge it - newFringeResult <- newFringesFound.nonEmpty - .guard[Option] - .as(newFringesFound.map(_.map(_.id))) - .traverse { fringes => - def doMerge( - prevFringeStateHash: Blake2b256Hash, - prevFringe: Set[BlockHash], - fringe: Set[BlockHash] - ): F[ - (Blake2b256Hash, Set[ByteString], Set[ByteString]) - ] = { - val mergeFringe = { - val (mScope, _) = - MergeScope.fromDag( - fringe, - prevFringe, - dag.childMap, - msgMap, - dag.hashLookup.getUnsafe - ) - val checkGenesisCase = - if (prevFringe.isEmpty) { - // genesis case - val genesisHash = dag.heightMap.getUnsafe(0).head - BlockStore[F] - .getUnsafe(genesisHash) - .map( - mScope.copy( - conflictScope = mScope.conflictScope - genesisHash - ) -> _.postStateHash.toBlake2b256Hash - ) - } else (mScope, prevFringeStateHash).pure - checkGenesisCase.flatMap { - case (mScope1, prevFringeStateHash1) => - MergeScope - .merge( - mScope1, - prevFringeStateHash1, - dag.fringeStates, - RuntimeManager[F].getHistoryRepo, - BlockIndex.getBlockIndex[F](_) - ) - .map(_ -> mScope.conflictScope) + fringesFoundDatas <- newFringesFound.nonEmpty + .guard[Option] + .as(newFringesFound.map(_.map(_.id))) + .traverse { fringes => + def doMerge( + accFringeDatas: List[FringeData], + prevFringeStateHash: Blake2b256Hash, + prevFringe: Set[BlockHash], + fringe: Set[BlockHash] + ): F[FringeData] = { + val mergeFringe = { + val (mScope, _) = + MergeScope.fromDag( + fringe, + prevFringe, + dag.childMap, + msgMap, + dag.hashLookup.getUnsafe + ) + val checkGenesisCase = + if (prevFringe.isEmpty) { + // genesis case + val genesisHash = dag.heightMap.getUnsafe(0).head + BlockStore[F] + .getUnsafe(genesisHash) + .map( + mScope.copy( + conflictScope = mScope.conflictScope - genesisHash + ) -> _.postStateHash.toBlake2b256Hash + ) + } else (mScope, prevFringeStateHash).pure + checkGenesisCase.flatMap { + case (mScope1, prevFringeStateHash1) => + MergeScope + .merge( + mScope1, + prevFringeStateHash1, + dag.fringeStates ++ accFringeDatas + .map(x => x.fringe -> x), + RuntimeManager[F].getHistoryRepo, + BlockIndex.getBlockIndex[F](_) + ) + .map { + case (finalizedState, _, rejected) => + FringeData( + FringeData.fringeHash(fringe), + fringe, + finalizedState, + rejected + ) -> mScope.conflictScope + } + } + } + mergeFringe.flatMap { + case (result @ FringeData(_, _, finalizedState, rejected)) -> cScope => + val msgFinalized = + s"New finalized fringe. " + + s"${prevFringe.map(_.toHexString.take(8)).toList.sorted} @ $prevFringeStateHash => " + + s"${fringe.map(_.toHexString.take(8)).toList.sorted} @ $finalizedState. " + + s"ConflictScope: ${cScope.map(_.toHexString.take(8)).toList.sorted}. " + + s"RejectedDeploys: ${rejected.map(_.toHexString.take(8)).toList.sorted}. " //+ + //s"MergedDeploys: ${merged.map(_.toHexString.take(8)).toList.sorted}." + Log[F].info(msgFinalized).as(result) } } - mergeFringe.flatMap { - case result -> cScope => - val (finalizedState, merged, rejected) = result - val msgFinalized = - s"New finalized fringe. " + - s"${prevFringeHashes.map(_.toHexString.take(8)).toList.sorted} @ $prevFringeState => " + - s"${fringe.map(_.toHexString.take(8)).toList.sorted} @ $finalizedState. " + - s"ConflictScope: ${cScope.map(_.toHexString.take(8)).toList.sorted}. " + - s"RejectedDeploys: ${rejected.map(_.toHexString.take(8)).toList.sorted}. " + - s"MergedDeploys: ${merged.map(_.toHexString.take(8)).toList.sorted}." - Log[F].info(msgFinalized).as(result) - } - } - fringes.foldM( - ( - prevFringeState, - prevFringeHashes, - Set.empty[ByteString], - Set.empty[ByteString] - ) - ) { - case ((prevFS, prevF, mjAcc, rjAcc), newF) => - doMerge(prevFS, prevF, newF).map { - case (newSt, mj, rj) => (newSt, newF, mjAcc ++ mj, rjAcc ++ rj) + fringes + .foldM( + ( + prevFringeState, + prevFringeHashes, + List.empty[FringeData] + ) + ) { + case ((prevFS, prevF, fdAcc), newF) => + doMerge(fdAcc, prevFS, prevF, newF).map { + case x @ FringeData(_, _, newSt, _) => + (newSt, newF, fdAcc :+ x) + } } + .map(_._3) } - } - (fringeState, newFringe, finMergedDeploys, finRejectedDeploys) = newFringeResult getOrElse (prevFringeState, prevFringeHashes, Set - .empty[ByteString], Set.empty[ByteString]) + newFringe = fringesFoundDatas.flatMap(_.lastOption.map(_.fringe)).getOrElse(prevFringeHashes) + fringeState = fringesFoundDatas + .flatMap(_.lastOption.map(_.stateHash)) + .getOrElse(prevFringeState) maxHeight = justifications.map(_.blockNum).maximumOption.getOrElse(-1L) maxSeqNums = justifications.map(m => (m.sender, m.seqNum)).toMap @@ -237,14 +278,18 @@ object MultiParentCasper { checkGenesisCase .flatMap { case (mScope1, prevFringeStateHash1) => - MergeScope - .merge( - mScope1, - prevFringeStateHash1, - dag.fringeStates, - RuntimeManager[F].getHistoryRepo, - BlockIndex.getBlockIndex[F](_) - ) + BlockDagStorage[F].getRepresentation.flatMap { d => + MergeScope + .merge( + mScope1, + prevFringeStateHash1, + d.fringeStates ++ fringesFoundDatas + .getOrElse(List()) + .map(x => x.fringe -> x), + RuntimeManager[F].getHistoryRepo, + BlockIndex.getBlockIndex[F](_) + ) + } } .map(_ -> mScope.conflictScope) .flatMap { @@ -268,10 +313,11 @@ object MultiParentCasper { maxSeqNums, fringe = newFringe, fringeState = fringeState, - fringeBondsMap = bondsMap, - fringeRejectedDeploys = finRejectedDeploys, + bonds = bondsMap, + foundFringes = fringesFoundDatas.getOrElse(List()), preStateHash = preStateHash, - rejectedDeploys = csRejectedDeploys + rejectedDeploys = csRejectedDeploys, + toEject = toEject ) def validate[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Metrics: Span]( diff --git a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala index f42cb121203..6605f4f0cc1 100644 --- a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala +++ b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala @@ -111,7 +111,8 @@ object BlockApi { blockSize = block.toProto.serializedSize.toString, deployCount = block.state.deploys.length, justifications = block.justifications.map(PrettyPrinter.buildStringNoLimit), - rejectedDeploys = block.rejectedDeploys.toSeq.map(PrettyPrinter.buildStringNoLimit), + rejectedDeploys = + block.fringes.flatMap(_.rejectedDeploys).map(PrettyPrinter.buildStringNoLimit), view = block.view.seen.map { case (v, r) => ViewInfo(v.toHexString, r.start.toLong, r.end.toLong) }.toList diff --git a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala index 48c68c3f734..bfeaa7c42bf 100644 --- a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala +++ b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala @@ -9,7 +9,7 @@ import coop.rchain.blockstorage.dag.BlockDagStorage.DeployId import coop.rchain.casper.merging.ParentsMergedState import coop.rchain.casper.protocol.{ProcessedDeploy, ProcessedSystemDeploy, RholangState} import coop.rchain.casper.rholang.RuntimeManager.StateHash -import coop.rchain.casper.rholang.sysdeploys.{CloseBlockDeploy, SlashDeploy} +import coop.rchain.casper.rholang.sysdeploys.{CloseBlockDeploy, EjectDeploy, SlashDeploy} import coop.rchain.casper.rholang.{BlockRandomSeed, InterpreterUtil, RuntimeManager} import coop.rchain.casper.syntax.casperSyntaxRuntimeManager import coop.rchain.casper.util.ProtoUtil @@ -34,7 +34,7 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { ): F[BlockCreatorResult] = { val preStateHash = preState.preStateHash val parents = preState.justifications.map(_.blockHash) - val bondsMap = preState.fringeBondsMap + val bondsMap = preState.bonds val blockNum = preState.justifications.map(_.blockNum).max + 1 val creatorsPk = id.publicKey val creatorsId = creatorsPk.bytes.toByteString @@ -44,7 +44,7 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { val shouldPropose = deploys.nonEmpty || toSlash.nonEmpty || changeEpoch // deploys that are rejected on finalization done by the block being created - val finalization = preState.fringeRejectedDeploys + val finalization = preState.foundFringes.flatMap(_.rejectedDeploys).toSet def propose: F[StateTransitionResult] = { val rand = BlockRandomSeed.randomGenerator(shardId, blockNum, creatorsPk, preStateHash) @@ -56,9 +56,18 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { toSlash.toList.sorted.zip(slashSeeds).map(SlashDeploy.tupled) } + val ejectDeploys = { + // seeds from 0 to deploys.size are used in deploys execution, so system deploy seeds start from the next index + val seeds = + (0 until preState.toEject.size) + .map(_ + deploys.size + toSlash.size) + .map(i => rand.splitByte(i.toByte)) + preState.toEject.toList.sorted.zip(seeds).map(EjectDeploy.tupled) + } + // Close block using rholang only if rholang state has been adjusted by a block val closeDeployOpt = (slashDeploys.nonEmpty || deploys.nonEmpty).guard[Option].as { - val closeSeed = rand.splitByte((deploys.size + toSlash.size).toByte) + val closeSeed = rand.splitByte((deploys.size + toSlash.size + toSlash.size).toByte) CloseBlockDeploy(closeSeed) } @@ -111,11 +120,11 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { postStateHash, parents.toList, bondsMap, - finalization, state, view, preState.fringe.toList.sorted, - mergeables + mergeables, + preState.foundFringes ) // Sign a block (hash should not be changed) diff --git a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala index 81b96c9ebd0..0998dec32ca 100644 --- a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala +++ b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala @@ -159,7 +159,7 @@ object Proposer { nextSeqNum = creatorsLatestOpt.map(_.seqNum + 1).getOrElse(0L) nextBlockNum = preState.justifications.map(_.blockNum).max + 1 parentHashes = preState.justifications.map(_.blockHash) - finalBonds = preState.fringeBondsMap + finalBonds = preState.bonds offenders = preState.justifications.filter(_.validationFailed).map(_.sender) // slashing preStateBonds <- RuntimeManager[F].computeBonds(preStateHash.toByteString) diff --git a/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala b/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala index eb7705c701d..a00da756cd2 100644 --- a/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala +++ b/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala @@ -11,7 +11,7 @@ import coop.rchain.blockstorage.dag.codecs._ import coop.rchain.blockstorage.syntax._ import coop.rchain.casper.dag.BlockDagKeyValueStorage._ import coop.rchain.casper.merging.BlockIndex -import coop.rchain.casper.protocol.{BlockMessage, DeployData} +import coop.rchain.casper.protocol.{BlockMessage, DeployData, EjectSystemDeployData} import coop.rchain.casper.{MultiParentCasper, PrettyPrinter} import coop.rchain.crypto.signatures.Signed import coop.rchain.metrics.Metrics.Source @@ -45,6 +45,7 @@ final class BlockDagKeyValueStorage[F[_]: Async: Log] private ( def getRepresentation: F[DagRepresentation] = representationState.get + // TODO store FringeData from PreState override def insert( blockMetadata: BlockMetadata, block: BlockMessage, @@ -63,7 +64,7 @@ final class BlockDagKeyValueStorage[F[_]: Async: Log] private ( _ <- deployIndex.put(block.state.deploys.map(_.deploy.sig).map(_ -> block.blockHash)) // Add fringe data - fringeHash = FringeData.fringeHash(blockMetadata.fringe) +// fringeHash = FringeData.fringeHash(blockMetadata.fringe) // Calculate blocks included in the fringe // justificationsMsgs = blockMetadata.justifications.map(dagState.msgMap) // prevFringeMsgs = dagState.msgMap.latestFringe(justificationsMsgs) @@ -85,32 +86,35 @@ final class BlockDagKeyValueStorage[F[_]: Async: Log] private ( // fringeDiffMetas <- fringeDiffHashes.toList.traverse(blockMetadataIndex.getUnsafe) // fringeDiffMetasUpdated = fringeDiffMetas.map(_.copy(memberOfFringe = fringeHash.some)) // _ <- fringeDiffMetasUpdated.traverse(blockMetadataIndex.add) - metadataStateHash = blockMetadata.fringeStateHash.toBlake2b256Hash - fringeData = FringeData( - fringeHash, - fringe = blockMetadata.fringe, - //fringeDiff = fringeDiffHashes, - stateHash = metadataStateHash, - rejectedDeploys = block.rejectedDeploys, - rejectedBlocks = block.rejectedBlocks, - rejectedSenders = block.rejectedSenders - ) - // Save to fringe data store - shouldSave <- (!blockMetadata.validationFailed).pure &&^ fringeDataStore - .get1(fringeHash) - .flatMap { - case Some(fd) => - FatalError( - s"Attempt do add block with equivocating fringe state hash. " + - s"Fringe: ${blockMetadata.fringe.map(_.toHexString.take(6))}, " + - s"persisted: ${fd.stateHash}, " + - s"attempting to add: $metadataStateHash." - ).raiseError - .whenA(fd.stateHash != metadataStateHash) - .as(false) - case None => true.pure - } - _ <- fringeDataStore.put(fringeHash, fringeData).whenA(shouldSave) +// metadataStateHash = blockMetadata.fringeStateHash.toBlake2b256Hash +// fringeData = FringeData( +// fringeHash, +// fringe = blockMetadata.fringe, +// //fringeDiff = fringeDiffHashes, +// stateHash = metadataStateHash, +// rejectedDeploys = block.rejectedDeploys +// ) + + _ <- block.fringes.traverse { fringeData => + for { + // Save to fringe data store + shouldSave <- (!blockMetadata.validationFailed).pure &&^ fringeDataStore + .get1(fringeData.fringeHash) + .flatMap { + case Some(fd) => + FatalError( + s"Attempt do add block with equivocating fringe state hash. " + + s"Fringe: ${blockMetadata.fringe.map(_.toHexString.take(6))}, " + + s"persisted: ${fd.stateHash}, " + + s"attempting to add: ${fringeData.stateHash}." + ).raiseError + .whenA(fd.stateHash != fringeData.stateHash) + .as(false) + case None => true.pure + } + _ <- fringeDataStore.put(fringeData.fringeHash, fringeData).whenA(shouldSave) + } yield () + } // Update in-mem indices dag <- representationState.get @@ -126,8 +130,10 @@ final class BlockDagKeyValueStorage[F[_]: Async: Log] private ( (msg.sender, msg.senderSeq), dr.hashLookup.getOrElse((msg.sender, msg.senderSeq), Set()) + msg.id ) - val newFringes = dr.fringeStates + ((msg.fringe, fringeData)) - val newDagSet = dr.dagSet + msg.id + val newFringes = block.fringes.foldLeft(dr.fringeStates) { + case (acc, fd) => acc + (fd.fringe -> fd) + } + val newDagSet = dr.dagSet + msg.id val newChildMap = msg.parents.foldLeft(dr.childMap) { case (acc, p) => acc.updated(p, acc.get(p).map(_ + msg.id).getOrElse(Set(msg.id))) } @@ -576,7 +582,8 @@ object BlockDagKeyValueStorage { bondsMap = block.bondsMap, parents = block.justifications, fringe = block.fringe, - seen = block.view + seen = block.view, + ejections = block.ejections ) private def removeExpiredFromPool[F[_]: Monad]( diff --git a/casper/src/main/scala/coop/rchain/casper/genesis/Genesis.scala b/casper/src/main/scala/coop/rchain/casper/genesis/Genesis.scala index fd0917498f8..10d5d7f9b2c 100644 --- a/casper/src/main/scala/coop/rchain/casper/genesis/Genesis.scala +++ b/casper/src/main/scala/coop/rchain/casper/genesis/Genesis.scala @@ -12,7 +12,8 @@ import coop.rchain.casper.util.ProtoUtil.unsignedBlockProto import coop.rchain.casper.{PrettyPrinter, ValidatorIdentity} import coop.rchain.crypto.PublicKey import coop.rchain.crypto.signatures.Signed -import coop.rchain.models.BlockVersion +import coop.rchain.models.syntax.modelsSyntaxByteString +import coop.rchain.models.{BlockVersion, FringeData} import coop.rchain.rholang.interpreter.SystemProcesses.BlockData import coop.rchain.rspace.hashing.Blake2b256Hash import coop.rchain.sdk.dag.View @@ -126,11 +127,18 @@ object Genesis { postStateHash = postStateHash, justifications = List.empty, bonds = buildBondsMap(genesis.proofOfStake), - rejectedDeploys = Set.empty, state = state, view = View.semigroupDagSeen.empty, fringe = List.empty, - mergeables + mergeables, + List( + FringeData( + FringeData.fringeHash(Set()), + Set(), + RuntimeManager.emptyStateHashFixed.toBlake2b256Hash, + Set() + ) + ) ) } diff --git a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala index b2f439bb6f1..e152d74290e 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala @@ -103,6 +103,11 @@ object MergeScope { (MergeScope(fScopeIds, cScopeIds /*-- baseMsg.toSet*/ ), baseMsg) } + implicit val showDCI = new Show[DeployChainIndex] { + override def show(t: DeployChainIndex): String = + t.deploysWithCost.map(_.id.toHexString.take(8)).mkString(";") + } + def merge[F[_]: Async: Log]( mergeScope: MergeScope, baseState: Blake2b256Hash, @@ -137,14 +142,42 @@ object MergeScope { ) // TODO conflictsMap and dependentsMap computations are expensive // can be cached and updated on new block added | new fringe finalized + val int = conflictSet.map(_.deploysWithCost.map(_.id)) intersect finalSet.map( + _.deploysWithCost.map(_.id) + ) + assert( + int.isEmpty, + s"conflictSet intersects finalSet ${int.map(_.map(_.toHexString.take(8)))}" + ) val (conflictsMap, dependencyMap) = ConflictResolutionLogic.computeRelationMapForMergeSet( conflictSet, finalSet, DeployChainIndex.deploysAreConflicting, DeployChainIndex.depends ) - val resolveConflicts = loadInitMergeableValues.map { initMergeableValues => - ConflictResolutionLogic.resolveConflictSet[DeployChainIndex, Blake2b256Hash]( + val resolveConflicts = Log[F].info( + s"conflictsMap: \n${conflictsMap + .map { + case (k, v) => + k.deploysWithCost.map(_.id.toHexString.take(8)).toList.sorted.mkString(">") -> v + .map(_.deploysWithCost.map(_.id.toHexString.take(8)).toList.sorted.mkString(">")) + .toList + .sorted + } + .map { case (k, v) => s"$k --- ${v.mkString("&")}" } + .mkString("\n")}" ++ + s"\ndependsMap: \n${dependencyMap + .map { + case (k, v) => + k.deploysWithCost.map(_.id.toHexString.take(8)).toList.sorted.mkString(">") -> v + .map(_.deploysWithCost.map(_.id.toHexString.take(8)).toList.sorted.mkString(">")) + .toList + .sorted + } + .map { case (k, v) => s"$k --- ${v.mkString("&")}" } + .mkString("\n")}" + ) *> loadInitMergeableValues.flatMap { initMergeableValues => + ConflictResolutionLogic.resolveConflictSet[F, DeployChainIndex, Blake2b256Hash]( conflictSet = conflictSet, acceptedFinally = acceptedFinally.map(_._2).toSet, rejectedFinally = rejectedFinally.map(_._2).toSet, @@ -154,18 +187,23 @@ object MergeScope { dependencyMap = dependencyMap, // support for mergeable mergeableDiffs = mergeableDiffsMap, - initMergeableValues = initMergeableValues + initMergeableValues = initMergeableValues, + Log[F].info(_) ) } resolveConflicts.flatMap { case (toMerge, rejected) => - computeMergedState(toMerge, baseState, historyRepository).map { newState => - ( - newState, - toMerge.flatMap(_.deploysWithCost.map(_.id)), - rejected.flatMap(_.deploysWithCost.map(_.id)) - ) - } + Log[F].info( + s"toMerge ${toMerge.map(_.deploysWithCost.map(_.id.toHexString.take(8)))} " + + s"toReject ${rejected.map(_.deploysWithCost.map(_.id.toHexString.take(8)))}" + ) *> + computeMergedState(toMerge, baseState, historyRepository).map { newState => + ( + newState, + toMerge.flatMap(_.deploysWithCost.map(_.id)), + rejected.flatMap(_.deploysWithCost.map(_.id)) + ) + } } } } diff --git a/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala b/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala index a8a53ac13af..eecf1a1ac02 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala @@ -2,7 +2,7 @@ package coop.rchain.casper.merging import com.google.protobuf.ByteString import coop.rchain.models.BlockHash.BlockHash -import coop.rchain.models.BlockMetadata +import coop.rchain.models.{BlockMetadata, FringeData} import coop.rchain.models.Validator.Validator import coop.rchain.rspace.hashing.Blake2b256Hash @@ -15,7 +15,7 @@ import coop.rchain.rspace.hashing.Blake2b256Hash * @param maxSeqNums latest sequence numbers for bonded validators * @param fringe finalized fringe seen (finalized) by parents * @param fringeState finalized fringe (merged) state - * @param fringeBondsMap bonds map of validators on finalized fringe state + * @param bonds bonds map observed by message * @param fringeRejectedDeploys rejected deploys from blocks finalized with [[fringe]] blocks * @param preStateHash state hash after non-finalized blocks are merged * @param rejectedDeploys rejected deploys after non-finalized blocks are merged @@ -27,9 +27,10 @@ final case class ParentsMergedState( // Fringe merged state fringe: Set[BlockHash], fringeState: Blake2b256Hash, - fringeBondsMap: Map[Validator, Long], - fringeRejectedDeploys: Set[ByteString], + bonds: Map[Validator, Long], + foundFringes: List[FringeData], // Conflict scope state (non-finalized blocks) preStateHash: Blake2b256Hash, - rejectedDeploys: Set[ByteString] + rejectedDeploys: Set[ByteString], + toEject: Set[ByteString] ) diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala b/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala index 596d0c4229c..ac234e3afc0 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala @@ -25,7 +25,7 @@ import coop.rchain.models.BlockHash.BlockHash import coop.rchain.models.NormalizerEnv.ToEnvMap import coop.rchain.models.Validator.Validator import coop.rchain.models.syntax._ -import coop.rchain.models.{BlockMetadata, NormalizerEnv, Par} +import coop.rchain.models.{BlockMetadata, FringeData, NormalizerEnv, Par} import coop.rchain.rholang.interpreter.SystemProcesses.BlockData import coop.rchain.rholang.interpreter.compiler.Compiler import coop.rchain.rholang.interpreter.errors.InterpreterError @@ -74,32 +74,31 @@ object InterpreterUtil { fringe = Set[BlockHash](), fringeState = genesisPreStateHash, // TODO: validate with data from bonds file - fringeBondsMap = block.bonds, - fringeRejectedDeploys = Set[ByteString](), + bonds = block.bonds, + foundFringes = List[FringeData](), // TODO: validate with data from config (genesis block number) maxBlockNum = 0L, // TODO: validate with sender in bonds map maxSeqNums = Map[Validator, Long](block.sender -> 0L), // TODO: validate genesis post-state hash preStateHash = genesisPreStateHash, - rejectedDeploys = Set() + rejectedDeploys = Set(), + toEject = Set() ).pure[F] } computedPreStateHash = preState.preStateHash.toByteString - rejectedDeployIds = preState.fringeRejectedDeploys + foundFringes = preState.foundFringes result <- { val incomingPreStateHash = block.preStateHash - if (rejectedDeployIds != block.rejectedDeploys) { - // TODO: if rejected deploys are different that almost certain - // hashes doesn't match also so this branch is unreachable + if (foundFringes != block.fringes) { Log[F] .warn( - s"Computed rejected deploys " + - s"[${rejectedDeployIds.map(PrettyPrinter.buildString).mkString(",")}] does not equal " + - s"block's rejected deploy " + - s"[${block.rejectedDeploys.map(PrettyPrinter.buildString).mkString(",")}]" + s"Computed fringes " + + s"[${foundFringes.map(_.show).mkString(", ")}] does not equal " + + s"block's fringes " + + s"[${block.fringes.map(_.show).mkString(",")}]" ) .as(InvalidRejectedDeploy.asLeft) } else { @@ -134,11 +133,12 @@ object InterpreterUtil { val bmd = BlockMetadata .fromBlock(block) .copy( - bondsMap = preState.fringeBondsMap, + bondsMap = preState.bonds, validated = true, validationFailed = result.isLeft || !result.toOption.get, fringe = preState.fringe, - fringeStateHash = preState.fringeState.bytes.toArray.toByteString + fringeStateHash = preState.fringeState.bytes.toArray.toByteString, + ejections = preState.toEject ) (bmd, result) } diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala index e8292986e35..6df307972d9 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala @@ -6,6 +6,7 @@ import cats.syntax.all._ import coop.rchain.casper.CasperMetricsSource import coop.rchain.casper.protocol.{ CloseBlockSystemDeployData, + EjectSystemDeployData, Empty, ProcessedDeploy, ProcessedSystemDeploy, @@ -15,6 +16,7 @@ import coop.rchain.casper.rholang.InterpreterUtil.printDeployErrors import coop.rchain.casper.rholang.syntax.RuntimeSyntax.SysEvalResult import coop.rchain.casper.rholang.sysdeploys.{ CloseBlockDeploy, + EjectDeploy, PreChargeDeploy, RefundDeploy, SlashDeploy @@ -290,6 +292,18 @@ final class RuntimeReplayOps[F[_]](private val runtime: ReplayRhoRuntime[F]) ext runtime.getNumberChannelsData(er.mergeable).map((_, er)) } ).map(_._1) + case EjectSystemDeployData(ejectedValidator) => + val ejectDeploy = { + EjectDeploy(ejectedValidator, rand) + } + rigWithCheck( + processedSysDeploy, + replaySystemDeployInternal(ejectDeploy, none).semiflatMap { + case (_, er) => + runtime.createSoftCheckpoint.whenA(er.succeeded) *> + runtime.getNumberChannelsData(er.mergeable).map((_, er)) + } + ).map(_._1) case CloseBlockSystemDeployData => val closeBlockDeploy = CloseBlockDeploy(rand) rigWithCheck( diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/EjectDeploy.scala b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/EjectDeploy.scala new file mode 100644 index 00000000000..31e39cdc7f3 --- /dev/null +++ b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/EjectDeploy.scala @@ -0,0 +1,62 @@ +package coop.rchain.casper.rholang.sysdeploys + +import coop.rchain.casper.rholang.types.{SystemDeploy, SystemDeployUserError} +import coop.rchain.crypto.hash.Blake2b512Random +import coop.rchain.models.NormalizerEnv.{Contains, ToEnvMap} +import coop.rchain.models.Validator.Validator +import coop.rchain.models.rholang.RhoType._ + +final case class EjectDeploy( + ejectedValidator: Validator, + initialRand: Blake2b512Random +) extends SystemDeploy(initialRand) { + import coop.rchain.models._ + import Expr.ExprInstance._ + import rholang.{implicits => toPar} + import shapeless._ + import shapeless.syntax.singleton._ + + type Output = (RhoBoolean, Either[RhoString, RhoNil]) + type Result = Unit + + val `sys:casper:ejectedValidator` = Witness("sys:casper:ejectedValidator") + type `sys:casper:ejectedValidator` = `sys:casper:ejectedValidator`.T + + type Env = + (`sys:casper:ejectedValidator` ->> GByteArray) :: (`sys:casper:authToken` ->> GSysAuthToken) :: (`sys:casper:return` ->> GUnforgeable) :: HNil + + import toPar._ + protected override val envsReturnChannel = Contains[Env, `sys:casper:return`] + protected override val toEnvMap = ToEnvMap[Env] + protected override val normalizerEnv = new NormalizerEnv( + ("sys:casper:ejectedValidator" ->> GByteArray(ejectedValidator)) + :: mkSysAuthToken + :: mkReturnChannel + :: HNil + ) + + override val source: String = + """#new rl(`rho:registry:lookup`), + # poSCh, + # ejectedValidator(`sys:casper:ejectedValidator`), + # sysAuthToken(`sys:casper:authToken`), + # return(`sys:casper:return`) + #in { + # rl!(`rho:rchain:pos`, *poSCh) | + # for(@(_, Pos) <- poSCh) { + # @Pos!("eject", *ejectedValidator, *sysAuthToken, *return) + # } + #}""".stripMargin('#') + + protected override val extractor = Extractor.derive + + protected def processResult( + value: (Boolean, Either[String, Unit]) + ): Either[SystemDeployUserError, Unit] = + value match { + case (true, _) => Right(()) + case (false, Left(errorMsg)) => Left(SystemDeployUserError(errorMsg)) + case _ => Left(SystemDeployUserError("ejecting failed unexpectedly")) + } + +} diff --git a/casper/src/main/scala/coop/rchain/casper/util/ProtoUtil.scala b/casper/src/main/scala/coop/rchain/casper/util/ProtoUtil.scala index d422ec36ea9..7c5ef29b83c 100644 --- a/casper/src/main/scala/coop/rchain/casper/util/ProtoUtil.scala +++ b/casper/src/main/scala/coop/rchain/casper/util/ProtoUtil.scala @@ -44,11 +44,11 @@ object ProtoUtil { postStateHash: ByteString, justifications: List[BlockHash], bonds: Map[Validator, Long], - rejectedDeploys: Set[ByteString], state: RholangState, view: View[Validator], fringe: List[BlockHash], - mergeables: Seq[Map[Blake2b256Hash, Long]] + mergeables: Seq[Map[Blake2b256Hash, Long]], + fringeDatas: List[FringeData] ): BlockMessage = { val block = BlockMessage( version, @@ -62,8 +62,7 @@ object ProtoUtil { postStateHash = postStateHash, justifications, bonds, - rejectedDeploys, - rejectedBlocks = Set(), + fringes = fringeDatas, rejectedSenders = Set(), state, // Signature algorithm is now part of the block hash diff --git a/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala b/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala index f6e9b8359d4..64c007b4739 100644 --- a/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala +++ b/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala @@ -17,7 +17,7 @@ import coop.rchain.casper.syntax._ import coop.rchain.casper.util.ConstructDeploy import coop.rchain.metrics.{Metrics, Span} import coop.rchain.models.BlockHash.BlockHash -import coop.rchain.models.BlockMetadata +import coop.rchain.models.{BlockMetadata, FringeData} import coop.rchain.models.Validator.Validator import coop.rchain.models.block.StateHash._ import coop.rchain.models.blockImplicits.getRandomBlock @@ -41,11 +41,12 @@ object BlockGenerator { maxSeqNums = Map.empty, fringe = Set(), fringeState = RuntimeManager.emptyStateHashFixed.toBlake2b256Hash, - fringeBondsMap = Map.empty, - fringeRejectedDeploys = Set(), + bonds = Map.empty, + foundFringes = List.empty[FringeData], // Pre-state is the same as fringe state preStateHash = RuntimeManager.emptyStateHashFixed.toBlake2b256Hash, - rejectedDeploys = Set() + rejectedDeploys = Set(), + toEject = Set() ) def step[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log: Metrics: Span]( diff --git a/models/src/main/protobuf/CasperMessage.proto b/models/src/main/protobuf/CasperMessage.proto index 90ba9b67fcf..352db874fbe 100644 --- a/models/src/main/protobuf/CasperMessage.proto +++ b/models/src/main/protobuf/CasperMessage.proto @@ -101,8 +101,7 @@ message BlockMessageProto { repeated bytes justifications = 9 [(scalapb.field).collection_type="List"]; // map of all validators to latest blocks based on current view repeated BondProto bonds = 10 [(scalapb.field).collection_type="List"]; // validators and their stakes // Rejected data in finalized fringe - repeated bytes rejectedDeploys = 11 [(scalapb.field).collection_type="List"]; - repeated bytes rejectedBlocks = 12 [(scalapb.field).collection_type="List"]; + repeated FringeDataProto fringes = 11 [(scalapb.field).collection_type="List"]; repeated bytes rejectedSenders = 13 [(scalapb.field).collection_type="List"]; // Rholang execution trace RholangStateProto state = 14 [(scalapb.field).no_box = true]; @@ -146,13 +145,18 @@ message SlashSystemDeployDataProto { bytes slashedValidator = 1; } +message EjectSystemDeployDataProto { + bytes ejectedValidator = 1; +} + message CloseBlockSystemDeployDataProto{ } message SystemDeployDataProto{ oneof systemDeploy{ SlashSystemDeployDataProto slashSystemDeploy = 1; - CloseBlockSystemDeployDataProto closeBlockSystemDeploy = 2; + EjectSystemDeployDataProto ejectSystemDeploy = 2; + CloseBlockSystemDeployDataProto closeBlockSystemDeploy = 3; } } @@ -257,6 +261,7 @@ message BlockMetadataProto { bytes memberOfFringe = 40; // View of the block TODO this is also on block, remove here repeated ViewProto view = 41 [(scalapb.field).collection_type="List"]; + repeated bytes ejections = 42 [(scalapb.field).collection_type="List"]; } message FringeDataProto { diff --git a/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala b/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala index 9b48fac9513..9ecc5f030dd 100644 --- a/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala +++ b/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala @@ -7,7 +7,7 @@ import coop.rchain.casper.PrettyPrinter import coop.rchain.crypto.PublicKey import coop.rchain.crypto.signatures.{SignaturesAlg, Signed} import coop.rchain.models.BlockHash.BlockHash -import coop.rchain.models.PCost +import coop.rchain.models.{FringeData, PCost} import coop.rchain.models.syntax._ import coop.rchain.models.Validator.Validator import coop.rchain.models.block.StateHash.StateHash @@ -133,8 +133,7 @@ final case class BlockMessage( justifications: List[BlockHash], bonds: Map[Validator, Long], // Rejections - rejectedDeploys: Set[ByteString], - rejectedBlocks: Set[BlockHash], + fringes: List[FringeData], rejectedSenders: Set[ByteString], // Rholang (tuple space) state change state: RholangState, @@ -167,8 +166,7 @@ object BlockMessage { bm.postStateHash, bm.justifications, bm.bonds.map(b => (b.validator, b.stake)).toMap, - bm.rejectedDeploys.toSet, - bm.rejectedBlocks.toSet, + bm.fringes.map(FringeData.from), bm.rejectedSenders.toSet, state, bm.sigAlgorithm, @@ -190,8 +188,6 @@ object BlockMessage { .sortBy { case (validator, _) => validator } .map { case (validator, stake) => BondProto(validator, stake) } // Sorted rejections - val sortedRejectedDeploys = bm.rejectedDeploys.toList.sorted - val sortedRejectedBlocks = bm.rejectedBlocks.toList.sorted val sortedRejectedSenders = bm.rejectedSenders.toList.sorted // Build proto message BlockMessageProto() @@ -205,8 +201,7 @@ object BlockMessage { .withPostStateHash(bm.postStateHash) .withJustifications(sortedJustifications) .withBonds(sortedBonds) - .withRejectedDeploys(sortedRejectedDeploys) - .withRejectedBlocks(sortedRejectedBlocks) + .withFringes(bm.fringes.map(FringeData.toProto)) .withRejectedSenders(sortedRejectedSenders) .withState(RholangState.toProto(bm.state)) .withSigAlgorithm(bm.sigAlgorithm) @@ -300,6 +295,7 @@ object ProcessedDeploy { sealed trait SystemDeployData final case class SlashSystemDeployData(slashedValidator: Validator) extends SystemDeployData +final case class EjectSystemDeployData(ejectedValidator: Validator) extends SystemDeployData case object CloseBlockSystemDeployData extends SystemDeployData case object Empty extends SystemDeployData @@ -327,6 +323,10 @@ object SystemDeployData { SystemDeployDataProto().withSlashSystemDeploy( SlashSystemDeployDataProto(slashedValidator) ) + case EjectSystemDeployData(ejectedValidator) => + SystemDeployDataProto().withEjectSystemDeploy( + EjectSystemDeployDataProto(ejectedValidator) + ) case CloseBlockSystemDeployData => SystemDeployDataProto().withCloseBlockSystemDeploy( CloseBlockSystemDeployDataProto() diff --git a/models/src/main/scala/coop/rchain/models/BlockMetadata.scala b/models/src/main/scala/coop/rchain/models/BlockMetadata.scala index 994d5ccee60..3f7371d99a0 100644 --- a/models/src/main/scala/coop/rchain/models/BlockMetadata.scala +++ b/models/src/main/scala/coop/rchain/models/BlockMetadata.scala @@ -26,7 +26,9 @@ final case class BlockMetadata( fringeStateHash: StateHash, // Fringe (fringe hash) where/when block is finalized memberOfFringe: Option[Blake2b256Hash], - view: View[Validator] + view: View[Validator], + // local decision to eject sender when computing fringe + ejections: Set[Validator] ) { // BlockMetadata is uniquely identified with BlockHash // - overridden hashCode is to be more performant when used in Set or Map @@ -46,7 +48,8 @@ object BlockMetadata { b.fringe.toSet, b.fringeStateHash, Option(b.memberOfFringe).filterNot(_.isEmpty).map(_.toBlake2b256Hash), - View(b.view.map(x => x.validator -> (x.seqStart.toInt to x.seqEnd.toInt)).toMap) + View(b.view.map(x => x.validator -> (x.seqStart.toInt to x.seqEnd.toInt)).toMap), + b.ejections.toSet ) def toProto(b: BlockMetadata) = BlockMetadataProto( @@ -61,7 +64,8 @@ object BlockMetadata { b.fringe.toList, b.fringeStateHash, b.memberOfFringe.map(_.toByteString).getOrElse(ByteString.EMPTY), - b.view.seen.toList.map { case (v, r) => ViewProto(v, r.start.toLong, r.end.toLong) } + b.view.seen.toList.map { case (v, r) => ViewProto(v, r.start.toLong, r.end.toLong) }, + b.ejections.toList ) def fromBytes(bytes: Array[Byte]): BlockMetadata = @@ -83,6 +87,12 @@ object BlockMetadata { fringe = b.fringe.toSet, fringeStateHash = b.finStateHash, memberOfFringe = none, - view = b.view + view = b.view, + ejections = b.state.systemDeploys + .map(_.systemDeploy) + .collect { + case EjectSystemDeployData(ejectedValidator) => ejectedValidator + } + .toSet ) } diff --git a/models/src/main/scala/coop/rchain/models/FringeData.scala b/models/src/main/scala/coop/rchain/models/FringeData.scala index aedd22f9fc4..f3874fdebe2 100644 --- a/models/src/main/scala/coop/rchain/models/FringeData.scala +++ b/models/src/main/scala/coop/rchain/models/FringeData.scala @@ -1,5 +1,6 @@ package coop.rchain.models +import cats.Show import com.google.protobuf.ByteString import coop.rchain.casper.protocol.FringeDataProto import coop.rchain.models.BlockHash.BlockHash @@ -13,9 +14,7 @@ final case class FringeData( // fringeDiff: Set[BlockHash], stateHash: Blake2b256Hash, // Rejected data in finalized fringe - rejectedDeploys: Set[ByteString], - rejectedBlocks: Set[BlockHash], - rejectedSenders: Set[ByteString] + rejectedDeploys: Set[ByteString] ) { // FringeData is uniquely identified with the hash of fringe hashes // - overridden hashCode is to be more performant when used in Set or Map @@ -40,9 +39,7 @@ object FringeData { b.fringe.toSet, // b.fringeDiff.toSet, b.stateHash.toBlake2b256Hash, - b.rejectedDeploys.toSet, - b.rejectedBlocks.toSet, - b.rejectedSenders.toSet + b.rejectedDeploys.toSet ) def toProto(b: FringeData) = FringeDataProto( @@ -50,13 +47,21 @@ object FringeData { b.fringe.toList, //b.fringeDiff.toList, b.stateHash.toByteString, - b.rejectedDeploys.toList, - b.rejectedBlocks.toList, - b.rejectedSenders.toList + b.rejectedDeploys.toList ) def fromBytes(bytes: Array[Byte]): FringeData = from(FringeDataProto.parseFrom(bytes)) def toBytes(b: FringeData) = FringeData.toProto(b).toByteArray + + implicit def showFringe: Show[FringeData] = new Show[FringeData] { + override def show(t: FringeData): String = + s""" + |${t.fringe.toList.sorted + .map(_.toHexString.take(8)) + .mkString("-")}_@_${t.stateHash.toByteString.toHexString}_R_[${t.rejectedDeploys.toList.sorted + .map(_.toHexString.take(8))}] + |""".stripMargin + } } diff --git a/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala b/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala index 7577cdef883..d2c626307a0 100644 --- a/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala +++ b/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala @@ -1,6 +1,7 @@ package coop.rchain.sdk.dag.merging -import cats.Order +import cats.effect.kernel.Sync +import cats.{Order, Show} import cats.syntax.all._ import scala.collection.compat.immutable.LazyList @@ -10,12 +11,9 @@ import scala.math.Numeric.LongIsIntegral object ConflictResolutionLogic { /** All items in dependency chains. */ - def withDependencies[D](of: Set[D], dependencyMap: Map[D, Set[D]]): Set[D] = { - def next(curOpt: Option[Set[D]]): Option[Set[D]] = curOpt.flatMap { c => - val n = c.flatMap(dependencyMap.getOrElse(_, Set())) - n.nonEmpty.guard[Option].as(n) - } - LazyList.iterate(of.some)(next).takeWhile(_.nonEmpty).flatten.flatten.toSet + def withDependencies[D](of: Seq[D], dependencyMap: Map[D, Set[D]]): Seq[D] = { + def next(x: Seq[D]): Seq[D] = x.flatMap { dependencyMap.getOrElse(_, Set()) } + LazyList.iterate(of)(next).takeWhile(_.nonEmpty).flatten } /** Deploys incompatible with finalized body. */ @@ -197,7 +195,7 @@ object ConflictResolutionLogic { calMergedResult(deploy, balancesAcc) .map((_, rejectedAcc)) .getOrElse( - (balancesAcc, rejectedAcc ++ withDependencies(Set(deploy), dependencyMap)) + (balancesAcc, rejectedAcc ++ withDependencies(Seq(deploy), dependencyMap)) ) } } @@ -214,7 +212,7 @@ object ConflictResolutionLogic { } /** Compute resolution for conflict set. */ - def resolveConflictSet[D: Ordering, CH]( + def resolveConflictSet[F[_]: Sync, D: Ordering: Show, CH]( conflictSet: Set[D], acceptedFinally: Set[D], rejectedFinally: Set[D], @@ -225,28 +223,49 @@ object ConflictResolutionLogic { dependencyMap: Map[D, Set[D]], // support for mergeable mergeableDiffs: Map[D, Map[CH, Long]], - initMergeableValues: Map[CH, Long] - ): (Set[D], Set[D]) = { + initMergeableValues: Map[CH, Long], + log: String => F[Unit] + ): F[(Set[D], Set[D])] = { + val incWithFin = + incompatibleWithFinal(acceptedFinally, rejectedFinally, conflictsMap, dependencyMap) val enforceRejected = withDependencies( - incompatibleWithFinal(acceptedFinally, rejectedFinally, conflictsMap, dependencyMap), + incWithFin.toSeq, dependencyMap ) val conflictSetCompatible = conflictSet -- enforceRejected // conflict map accounting for dependencies val fullConflictsMap = - conflictsMap.view.mapValues(vs => vs ++ withDependencies(vs, dependencyMap)).toMap + conflictsMap.view + .filterKeys(conflictSetCompatible) + .mapValues { vs => + val vsConfitSet = vs.filter(conflictSetCompatible) + vsConfitSet ++ withDependencies(vsConfitSet.toSeq, dependencyMap) + } + .toMap // find rejection combinations possible val rejectionOptions = computeRejectionOptions(fullConflictsMap) - // add to rejection options rejections caused by mergeable channels overflow - val mergeableOverflowRejectionOptions = addMergeableOverflowRejections( - conflictSet, - dependencyMap, - rejectionOptions, - initMergeableValues, - mergeableDiffs - ) - // find optimal rejection - val resolved = computeOptimalRejection(mergeableOverflowRejectionOptions, cost) - (conflictSetCompatible -- resolved, resolved ++ enforceRejected) + log( + s"acceptedFinally ${acceptedFinally.map(_.show).mkString("\n")}\n " + + s"rejectedFinally ${rejectedFinally.map(_.show).mkString("\n")}\n " + + s"incWithFin ${incWithFin.map(_.show).mkString("\n")}\n " + + s"rejectionOptions: ${rejectionOptions.map(_.map(_.show)).mkString("\n")}\n " + + s"enforceRejected: ${enforceRejected.toList.map(_.show)}" + ).flatMap { _ => + // add to rejection options rejections caused by mergeable channels overflow + val mergeableOverflowRejectionOptions = addMergeableOverflowRejections( + conflictSet, + dependencyMap, + rejectionOptions, + initMergeableValues, + mergeableDiffs + ) + log( + s"mergeableOverflowRejectionOptions ${mergeableOverflowRejectionOptions.map(_.map(_.show)).mkString("\n")}" + ).as { + // find optimal rejection + val resolved = computeOptimalRejection(mergeableOverflowRejectionOptions, cost) + (conflictSetCompatible -- resolved, resolved ++ enforceRejected) + } + } } }