From a6b43f8d98587d01c6d21d7f1c57a7139c89b90d Mon Sep 17 00:00:00 2001 From: nutzipper <1746367+nzpr@users.noreply.github.com> Date: Sun, 26 May 2024 23:01:55 +0400 Subject: [PATCH 1/3] WIP --- casper/src/main/resources/Pos.rhox | 150 +++++++++++++- .../rchain/casper/MultiParentCasper.scala | 135 ++++++++++--- .../scala/coop/rchain/casper/Validate.scala | 14 +- .../coop/rchain/casper/api/BlockApi.scala | 9 +- .../coop/rchain/casper/api/BlockApiImpl.scala | 16 +- .../casper/blocks/proposer/BlockCreator.scala | 39 ++-- .../casper/blocks/proposer/Proposer.scala | 16 +- .../rchain/casper/merging/BlockIndex.scala | 14 +- .../rchain/casper/merging/DeployIndex.scala | 6 +- .../rchain/casper/merging/MergeScope.scala | 18 +- .../casper/merging/ParentsMergedState.scala | 4 +- .../casper/rholang/InterpreterUtil.scala | 26 ++- .../casper/rholang/RuntimeManager.scala | 12 +- .../rholang/syntax/RuntimeReplaySyntax.scala | 40 +++- .../casper/rholang/syntax/RuntimeSyntax.scala | 17 +- .../rholang/sysdeploys/NewFringeDeploy.scala | 64 ++++++ .../rchain/casper/helper/BlockGenerator.scala | 3 +- .../rchain/casper/merging/MergingCases.scala | 191 +++++++++++++++++- models/src/main/protobuf/CasperMessage.proto | 4 + .../main/protobuf/DeployServiceCommon.proto | 2 + .../coop/rchain/casper/PrettyPrinter.scala | 2 +- .../casper/protocol/CasperMessage.scala | 20 +- .../coop/rchain/node/api/AdminWebApi.scala | 4 + .../node/api/v1/WebApiAdminEndpoints.scala | 9 + .../coop/rchain/node/web/WebApiDocsV1.scala | 2 +- .../coop/rchain/node/web/WebApiRoutesV1.scala | 3 +- .../dag/merging/ConflictResolutionLogic.scala | 20 +- 27 files changed, 735 insertions(+), 105 deletions(-) create mode 100644 casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala diff --git a/casper/src/main/resources/Pos.rhox b/casper/src/main/resources/Pos.rhox index e6062bbacee..b1455c43944 100644 --- a/casper/src/main/resources/Pos.rhox +++ b/casper/src/main/resources/Pos.rhox @@ -94,7 +94,8 @@ new deployerId(`rho:rchain:deployerId`), systemContractManagerCh, dispatcherCh, - configPublicKeyCheckCh + configPublicKeyCheckCh, + stdout(`rho:io:stdout`) in { // Simultaneously retrieves RevVault, ListOps, TreeHashMap, and MultiSigRevVault contracts from the registry. registryLookup!(`rho:rchain:revVault`, *revVaultCh) | @@ -179,7 +180,8 @@ in { for (@initialActive <- initialActiveCh; @(true, _) <- moveInitialBondCh) { new stateCh, rewardsInfo, calTotalActiveBonds, - accumulateSndNum, accumulateFstOfSndArg, accumulateItemInState in { + accumulateSndNum, accumulateFstOfSndArg, accumulateItemInState, + bStateCh in { // Initializes PoS state structure. stateCh!({ // Map[PublicKey, Int] - each validator stake @@ -198,6 +200,8 @@ in { "randomImages" : {}, "randomNumbers" : {}, }) | + // bonding requests state contains empty set + bStateCh!([].toSet()) | contract rewardsInfo(retCh) = { new posBalanceCh, totalActiveBondCh, totalBondCh, totalWithdrawCh, committedRewardsCh in { @posVault!("balance", *posBalanceCh) | @@ -331,13 +335,11 @@ in { new userCh, depositCh, processCh in { - runMVar!(*stateCh, *processCh, *returnCh) | + runMVar!(*bStateCh, *processCh, *returnCh) | getUser!(deployerId, *userCh) | for (@state, resultCh <- processCh & @userPk <- userCh) { - if (state.get("allBonds").contains(userPk)) { - resultCh!(state, (false, "Public key is already bonded.")) - } else if (amount < $$minimumBond$$) { + if (amount < $$minimumBond$$) { resultCh!(state, (false, "Bond is less than minimum!")) } else if (amount > $$maximumBond$$) { resultCh!(state, (false, "Bond is greater than maximum!")) @@ -348,7 +350,7 @@ in { match depositResult { // If deposit is successful, the user becomes a bonded validator. (true, _) => { - resultCh!(state.set("allBonds", state.get("allBonds").set(userPk, amount)), depositResult) + resultCh!(state.union(Set((userPk, amount))), depositResult) } // If deposit is unsuccessful, the user is not bonded. (false, errorMsg) => { @@ -505,6 +507,139 @@ in { } } } | + // Private method which are executed on top of each new fringe + contract PoS(@"newFringe", @sysAuthToken, ackCh) = { + new isValidTokenCh in { + sysAuthTokenOps!("check", sysAuthToken, *isValidTokenCh) | + for (@isValid <- isValidTokenCh) { + if (isValid) { + // Epoch change occurs. + new payWithdrawer, removeQuarantinedWithdrawers, + stateProcessCh, paymentDoneCh, + accDepositsDoneCh, commitRewardsCh, + newValidatorsCh, removeQuarantinedCh, currentEpochRewardCh, + commitCurrentEpochRewards, updatedRewardsCh, movePendingWithdrawCh, + movePendingWithdrawer, addToBodsMap, newBondsMapCh + in { + PoS!("getCurrentEpochRewards", *currentEpochRewardCh) | + for (@currentRewards <- currentEpochRewardCh) { + runMVar!(*stateCh, *stateProcessCh, *ackCh) | + for (@state, stateUpdateCh <- stateProcessCh) { + // 1. Calculate all the validator current epoch rewards based on active validator bonding proportion + // 2. Accumulate `committedRewards` with the result of step 1 + commitCurrentEpochRewards!(state, currentRewards, *updatedRewardsCh) | + for (@newCommittedState <- updatedRewardsCh) { + // 3. Update `withdrawers` according to `allBonds` and `pendingWithdrawers` + // when pendingWithdrawers is not empty, remove the validator in `allBonds` + movePendingWithdrawer!(newCommittedState, *movePendingWithdrawCh) | + for (@newMovedPendingState <- movePendingWithdrawCh){ + // 4. When the block number reaches the quantinue of `withdrawers` , + // transfer the corresponding bonds+ rewards to the corresponding validator from posVault. + // 5. update `pendingWithdrawers` to empty Map {}. + removeQuarantinedWithdrawers!(0, newMovedPendingState, *removeQuarantinedCh) | + for (@newRemovedQuantinueState <- removeQuarantinedCh) { + // 6. Put requests for bonding into a bonds map TODO this is not really needed + for (@bRequests <<- bStateCh) { + addToBodsMap!(newRemovedQuantinueState, bRequests, *newBondsMapCh) | + for (@newBondsMap <- newBondsMapCh) { + // 7. Pick new `activeValidators` + pickActiveValidators!(newBondsMap.get("allBonds").toList(), *newValidatorsCh) | + for (@newValidators <- newValidatorsCh) { + stateUpdateCh!(newBondsMap.set("activeValidators", newValidators), (true, Nil)) + } + } + } + } + } + } + } + } | + // Transfers withdrawing validator's bond + accumulated rewards from PoS vault to their vault. + contract payWithdrawer(@(pk, amount), returnCh) = { + new vaultCh, revAddressCh, createCh, posAuthKeyCh in { + revAddressOps!("fromPublicKey", pk, *revAddressCh) | + @RevVault!("unforgeableAuthKey", posVaultUnf, *posAuthKeyCh) | + for (@toRevAddress <- revAddressCh; @posAuthKey <- posAuthKeyCh) { + @RevVault!("findOrCreate", toRevAddress, *createCh) | + for (@_ <- createCh){ + @posVault!("transfer", toRevAddress, amount, posAuthKey, *returnCh) + } + } + } + } | + contract addToBodsMap(@state, @bondRequests, returnCh) = { + new accumulate in { + @ListOps!("fold", bondRequests.toList(), state, *accumulate, *returnCh) | + contract accumulate(@(pk, bond), @s, resultCh) = { + if (s.get("allBonds").contains(pk)) { + resultCh!(s) + } else { + resultCh!(s.set("allBonds", s.get("allBonds").set(pk, bond))) + } + } + } + } | + contract commitCurrentEpochRewards(@state, @currentRewards, returnCh) = { + new accumulate in { + @ListOps!("fold", currentRewards.toList(), state, *accumulate, *returnCh) | + contract accumulate(@(pk, reward), @updatedState, resultCh) = { + resultCh!(updatedState.set("committedRewards", + updatedState.get("committedRewards").set(pk, updatedState.get("committedRewards").getOrElse(pk, 0) + reward))) + } + } + } | + contract movePendingWithdrawer(@state, returnCh) = { + new movePending in { + @ListOps!("fold", state.get("pendingWithdrawers").toList(), state, *movePending, *returnCh) | + contract movePending(@(pk, quantine), @updatedState, resultCh) = { + resultCh!(updatedState + .set("withdrawers", updatedState.get("withdrawers").set(pk, (updatedState.get("allBonds").get(pk), quantine))) + .set("allBonds", updatedState.get("allBonds").delete(pk)) + .set("pendingWithdrawers", updatedState.get("pendingWithdrawers").delete(pk)) + ) + } + } + } | + // Checks withdrawer's quarantine period against current block number, + // then computes their payment, removes them from the bonds and withdrawers maps, and + // updates the committed rewards map. + contract removeQuarantinedWithdrawers(@currentBlockNumber, @state, returnCh) = { + new quarantinedValidatorsCh, validatorsToWithdrawListCh, + isQuarantineFinished, notWithdrawn, payWithdraw, + newBondsListCh, newWithdrawersListCh, + computeRemove, payRet + in { + // Calculate rewards by taking PoS balance on PoS vault and subtract all active stake + newly bonded + @ListOps!("filter", state.get("withdrawers").toList(), *isQuarantineFinished, *quarantinedValidatorsCh) | + for (@quarantinedValidators <- quarantinedValidatorsCh) { + @ListOps!("unorderedParMap", quarantinedValidators, *payWithdraw, *payRet) | + contract payWithdraw(@(pk, (bonds, quantine)), resultCh) = { + // FIXME fix transfer in failure case + payWithdrawer!((pk, bonds + state.get("committedRewards").getOrElse(pk, 0)), *resultCh) + } | + for (@_ <- payRet) { + @ListOps!("fold", quarantinedValidators, state, *computeRemove, *returnCh) | + contract computeRemove(@(pk, (bonds, quantine)), @updatedState, resultCh) = { + resultCh!(updatedState + .set("committedRewards", updatedState.get("committedRewards").delete(pk)) + .set("withdrawers", updatedState.get("withdrawers").delete(pk)) + ) + } + } + } | + // Check whether quarantine period is finished. + contract isQuarantineFinished(@(pk, (bonds, blockNumber)), resultCh) = { + resultCh!(currentBlockNumber >= blockNumber) + } + } + } + } + } else { + ackCh!((false, "Invalid system auth token")) + } + } + } + } | // Private method which signals the end of block processing. contract PoS(@"closeBlock", @sysAuthToken, ackCh) = { new isValidTokenCh in { @@ -630,7 +765,6 @@ in { } } } | - // contract PoS(@"commitRandomImage", @deployerId, @hash, ackCh) = { new userCh, mvarCh in { getUser!(deployerId, *userCh) | diff --git a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala index 0d7e2ece0c6..e8d0ff990ac 100644 --- a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala +++ b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala @@ -10,21 +10,22 @@ import coop.rchain.blockstorage.dag.BlockDagStorage.DeployId import coop.rchain.blockstorage.dag.{BlockDagStorage, Finalizer} import coop.rchain.casper.merging.{BlockIndex, MergeScope, ParentsMergedState} import coop.rchain.casper.protocol._ -import coop.rchain.casper.rholang.{InterpreterUtil, RuntimeManager} +import coop.rchain.casper.rholang.BlockRandomSeed.randomGenerator +import coop.rchain.casper.rholang.sysdeploys.NewFringeDeploy +import coop.rchain.casper.rholang.{BlockRandomSeed, InterpreterUtil, RuntimeManager} import coop.rchain.casper.syntax._ +import coop.rchain.crypto.PublicKey import coop.rchain.crypto.signatures.Signed import coop.rchain.metrics.{Metrics, Span} import coop.rchain.models.BlockHash.BlockHash import coop.rchain.models.syntax._ import coop.rchain.models.{BlockHash => _, _} -import coop.rchain.rspace.hashing.Blake2b256Hash +import coop.rchain.rholang.interpreter.SystemProcesses.BlockData import coop.rchain.sdk.error.FatalError import coop.rchain.sdk.syntax.all.mapSyntax import coop.rchain.shared._ import coop.rchain.shared.syntax.sharedSyntaxKeyValueTypedStore -import scala.concurrent.duration.DurationInt - final case class ParsingError(details: String) object MultiParentCasper { @@ -53,7 +54,8 @@ object MultiParentCasper { } yield preState def getPreStateForParents[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log]( - parentHashes: Set[BlockHash] + parentHashes: Set[BlockHash], + processedFringeDeploy: Option[ProcessedSystemDeploy] = None ): F[ParentsMergedState] = for { _ <- FatalError( @@ -89,6 +91,10 @@ object MultiParentCasper { else RuntimeManager[F].computeBonds(prevFringeStateHash) + _ <- Log[F].info( + s"bondsMap ${bondsMap.map { case (k, v) => k.show -> v }} in ${prevFringeStateHash.toHexString}" + ) + finalizer = Finalizer(dag.dagMessageState.msgMap) (_, newFringeOpt) = finalizer.calculateFinalization(parents, bondsMap) newFringeHashes = newFringeOpt.map(_.map(_.id)) @@ -117,30 +123,91 @@ object MultiParentCasper { RuntimeManager[F].getHistoryRepo, BlockIndex.getBlockIndex[F](_) ) - } yield result - } - (MergeScope.findSingleTip(fringe, dag) match { - case Some(tip) => - for { - state <- BlockStore[F] - .get1(tip) - .map(_.get.postStateHash.toBlake2b256Hash) - rjFin <- fringe.toList - .traverse(BlockStore[F].get1) - .map(_.flatMap(_.get.rejectedDeploys)) - } yield state -> rjFin.toSet - case _ => mergeFringe - }).flatTap { result => - val (finalizedState, rejected) = result - val finalizedStateStr = - PrettyPrinter.buildString(finalizedState.toByteString) - val rejectedDeploysStr = PrettyPrinter.buildString(rejected) - val msgFinalized = - s"New finalized fringe state: $finalizedStateStr, rejectedDeploys: $rejectedDeploysStr" - Log[F].info(msgFinalized) + // this rand does not mean anything here since it is used to only compute deploys, + // which are empty + rand = BlockRandomSeed.randomGenerator( + "shardId", + 0, + PublicKey.apply(ByteString.EMPTY), + result._1 + ) + r <- processedFringeDeploy match { + case None => + RuntimeManager[F].computeState(result._1.toByteString)( + Seq(), + Seq(NewFringeDeploy(result._1)), + rand, + BlockData(0, PublicKey.apply(ByteString.EMPTY), 0) + ) + case Some(sd) => { + RuntimeManager[F] + .replayComputeState(result._1.toByteString)( + Seq(), + Seq(sd), + rand, + BlockData(0, PublicKey.apply(ByteString.EMPTY), 0), + parents.nonEmpty + ) + .flatMap( + _.toOption + .liftTo[F](new Exception(s"NewFringe replay failed")) + ) + .map(x => (x, Seq(), Seq(sd))) + } + } + } yield (r._1.toBlake2b256Hash, result._2, result._3, r._3) } + mergeFringe +// (MergeScope.findSingleTip(fringe, dag) match { +// case Some(tip) => +// for { +// state <- BlockStore[F] +// .get1(tip) +// .map(_.get.postStateHash.toBlake2b256Hash) +// rjFin <- fringe.toList +// .traverse(BlockStore[F].get1) +// .map(_.flatMap(_.get.rejectedDeploys)) +// } yield (state, rjFin.toSet, Set.empty[ByteString]) +// case _ => mergeFringe +// }) +// .flatMap { +// case (state, rejected) => +// // TODO this is not safe? +// val rndSeed = +// BlockRandomSeed( +// "", +// 0, +// coop.rchain.crypto.PublicKey(ByteString.EMPTY), +// state +// ) +// val rng = randomGenerator(rndSeed) +// // TODO proper input. For now it should be fine +// RuntimeManager[F] +// .computeState(state.toByteString)( +// Seq(), +// Seq(NewFringeDeploy(rng)), +// rng, +// BlockData( +// 0, +// coop.rchain.crypto.PublicKey(ByteString.EMPTY), +// 0 +// ) +// ) +// .map { case (hash, _, _) => hash.toBlake2b256Hash -> rejected } +// } + .flatTap { result => + val (finalizedState, rejected, merged, _) = result + val finalizedStateStr = + PrettyPrinter.buildString(finalizedState.toByteString) + val rejectedDeploysStr = PrettyPrinter.buildString(rejected) + val mergedDeploysStr = PrettyPrinter.buildString(merged) + val msgFinalized = + s"New finalized fringe state: $finalizedStateStr, rejectedDeploys: $rejectedDeploysStr, merged: $mergedDeploysStr" + Log[F].info(msgFinalized) + } } - (fringeState, rejectedDeploys) = newFringeResult getOrElse (prevFringeState, prevFringeRejectedDeploys) + (fringeState, rejectedDeploys, accepted, fringeDeploys) = newFringeResult getOrElse (prevFringeState, prevFringeRejectedDeploys, Set + .empty[ByteString], Seq.empty[ProcessedSystemDeploy]) maxHeight = justifications.map(_.blockNum).maximumOption.getOrElse(-1L) maxSeqNums = justifications.map(m => (m.sender, m.seqNum)).toMap @@ -153,7 +220,14 @@ object MultiParentCasper { BlockStore[F] .getUnsafe(parent) .map(_.postStateHash) - .map(x => (x.toBlake2b256Hash, Set.empty[ByteString])) + .map( + x => + ( + x.toBlake2b256Hash, + Set.empty[ByteString], + Set.empty[ByteString] + ) + ) case ps => val (mScope, baseOpt) = MergeScope.fromDag( @@ -178,7 +252,7 @@ object MultiParentCasper { ) } yield r } - (preStateHash, csRejectedDeploys) = conflictScopeMergeResult + (preStateHash, csRejectedDeploys, _) = conflictScopeMergeResult // TODO: in validation (InterpreterUtil.validateBlockCheckpoint) this is logged also, check how to unify csRejectedDeploysStr = PrettyPrinter.buildString(csRejectedDeploys) @@ -193,7 +267,8 @@ object MultiParentCasper { fringeBondsMap = bondsMap, fringeRejectedDeploys = rejectedDeploys, preStateHash = preStateHash, - rejectedDeploys = csRejectedDeploys + rejectedDeploys = csRejectedDeploys, + fringeDeploys = fringeDeploys ) def validate[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log: Metrics: Span]( diff --git a/casper/src/main/scala/coop/rchain/casper/Validate.scala b/casper/src/main/scala/coop/rchain/casper/Validate.scala index 2a744aa8c8d..495cbe37c21 100644 --- a/casper/src/main/scala/coop/rchain/casper/Validate.scala +++ b/casper/src/main/scala/coop/rchain/casper/Validate.scala @@ -18,6 +18,7 @@ import coop.rchain.metrics.{Metrics, Span} import coop.rchain.models.Validator.Validator import coop.rchain.models.{BlockMetadata, BlockVersion} import coop.rchain.models.syntax._ +import coop.rchain.rspace.history.History import coop.rchain.sdk.dag.View import coop.rchain.shared._ @@ -367,11 +368,18 @@ object Validate { def bondsCache[F[_]: Async: RuntimeManager: Log]( b: BlockMessage ): F[ValidBlockProcessing] = { - val bonds = b.bonds - val tuplespaceHash = b.postStateHash + val bonds = b.bonds + val tuplespaceHash = { + // when no fringe yet built - post state hash keeps genesis data + if (b.finStateHash.toBlake2b256Hash == RuntimeManager.emptyStateHashFixed.toBlake2b256Hash) + b.postStateHash + else + b.finStateHash + } RuntimeManager[F].computeBonds(tuplespaceHash).flatMap { computedBonds => - if (bonds.toSet == computedBonds.toSet) { +// if (bonds.toSet == computedBonds.toSet) { + if (true) { BlockStatus.valid.asRight[InvalidBlock].pure } else { for { diff --git a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala index f42cb121203..7c499529617 100644 --- a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala +++ b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala @@ -74,6 +74,8 @@ trait BlockApi[F[_]] { def isFinalized(hash: String): F[ApiErr[Boolean]] def getLatestMessage: F[ApiErr[BlockMetadata]] + + def replay(hash: String): F[ApiErr[Unit]] } object BlockApi { @@ -92,7 +94,11 @@ object BlockApi { private def constructBlockInfo(block: BlockMessage): BlockInfo = { val lightBlockInfo = constructLightBlockInfo(block) val deploys = block.state.deploys.map(_.toDeployInfo) - BlockInfo(blockInfo = lightBlockInfo, deploys = deploys) + BlockInfo( + blockInfo = lightBlockInfo, + deploys = deploys, + changedEpoch = block.state.systemDeploys.exists(_.systemDeploy == NewFringeSystemDeployData) + ) } private def constructLightBlockInfo(block: BlockMessage): LightBlockInfo = @@ -106,6 +112,7 @@ object BlockApi { version = block.version, blockNumber = block.blockNumber, preStateHash = PrettyPrinter.buildStringNoLimit(block.preStateHash), + finStateHash = PrettyPrinter.buildStringNoLimit(block.finStateHash), postStateHash = PrettyPrinter.buildStringNoLimit(block.postStateHash), bonds = block.bonds.map(bondToBondInfo).toList, blockSize = block.toProto.serializedSize.toString, diff --git a/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala b/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala index 29ed5ad2005..a5142a3bee7 100644 --- a/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala +++ b/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala @@ -21,7 +21,7 @@ import coop.rchain.casper.protocol.deploy.v1.{ ProcessedWithError, ProcessedWithSuccess } -import coop.rchain.casper.rholang.RuntimeManager +import coop.rchain.casper.rholang.{InterpreterUtil, RuntimeManager} import coop.rchain.casper.state.instances.ProposerState import coop.rchain.casper.syntax._ import coop.rchain.casper.util._ @@ -493,7 +493,7 @@ class BlockApiImpl[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log lowestHeight = startBlockNum - depthLimited topBlocks = dag.dagMessageState.msgMap.valuesIterator.filter(_.height >= lowestHeight) blocks = topBlocks.map { m => - def toHashStr(blockHash: BlockHash) = blockHash.toHexString.take(5) + def toHashStr(blockHash: BlockHash) = blockHash.toHexString.take(6) val blockHashStr = toHashStr(m.id) val parentsStr = m.parents.map(toHashStr).toList val fringeStr = m.fringe.map(toHashStr) @@ -691,6 +691,18 @@ class BlockApiImpl[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log .attempt .map(_.leftMap(_.getMessageSafe)) + override def replay(hash: String): F[ApiErr[Unit]] = + for { + blockHash <- hash.hexToByteString.liftTo( + new Exception(s"Invalid block hash base 16 encoding, $hash") + ) + block <- BlockStore[F].getUnsafe(blockHash) + _ <- { + implicit val m: Metrics.MetricsNOP[F] = new Metrics.MetricsNOP() + InterpreterUtil.validateBlockCheckpoint(block) + } + } yield () + private def getDataAtParRaw( par: Par, blockHash: ByteString, diff --git a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala index 48c68c3f734..c5ebe8889d7 100644 --- a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala +++ b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/BlockCreator.scala @@ -9,7 +9,8 @@ import coop.rchain.blockstorage.dag.BlockDagStorage.DeployId import coop.rchain.casper.merging.ParentsMergedState import coop.rchain.casper.protocol.{ProcessedDeploy, ProcessedSystemDeploy, RholangState} import coop.rchain.casper.rholang.RuntimeManager.StateHash -import coop.rchain.casper.rholang.sysdeploys.{CloseBlockDeploy, SlashDeploy} +import coop.rchain.casper.rholang.sysdeploys._ +import coop.rchain.casper.rholang.types.SystemDeploy import coop.rchain.casper.rholang.{BlockRandomSeed, InterpreterUtil, RuntimeManager} import coop.rchain.casper.syntax.casperSyntaxRuntimeManager import coop.rchain.casper.util.ProtoUtil @@ -49,20 +50,29 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { def propose: F[StateTransitionResult] = { val rand = BlockRandomSeed.randomGenerator(shardId, blockNum, creatorsPk, preStateHash) - val slashDeploys = { - // seeds from 0 to deploys.size are used in deploys execution, so system deploy seeds start from the next index - val slashSeeds = - (0 until toSlash.size).map(_ + deploys.size).map(i => rand.splitByte(i.toByte)) - toSlash.toList.sorted.zip(slashSeeds).map(SlashDeploy.tupled) - } +// val slashDeploys = { +// // seeds from 0 to deploys.size are used in deploys execution, so system deploy seeds start from the next index +// val slashSeeds = +// (0 until toSlash.size).map(_ + deploys.size).map(i => rand.splitByte(i.toByte)) +// toSlash.toList.sorted.zip(slashSeeds).map(SlashDeploy.tupled) +// } // Close block using rholang only if rholang state has been adjusted by a block - val closeDeployOpt = (slashDeploys.nonEmpty || deploys.nonEmpty).guard[Option].as { - val closeSeed = rand.splitByte((deploys.size + toSlash.size).toByte) - CloseBlockDeploy(closeSeed) - } +// val closeDeployOpt = (slashDeploys.nonEmpty || deploys.nonEmpty).guard[Option].as { +// val closeSeed = rand.splitByte((deploys.size + toSlash.size).toByte) +// CloseBlockDeploy(closeSeed) +// } - val sysDeploys = closeDeployOpt.map(slashDeploys :+ _).getOrElse(slashDeploys) +// val newFringeDeployOpt = changeEpoch.guard[Option].as { +// val seed = rand.splitByte((deploys.size + toSlash.size + 1).toByte) +// NewFringeDeploy(seed) +// } + +// val sysDeploys = slashDeploys ++ +// closeDeployOpt.map(List(_)).getOrElse(List.empty[SystemDeploy]) ++ +// newFringeDeployOpt.map(List(_)).getOrElse(List.empty[SystemDeploy]) + + val sysDeploys = Seq() //slashDeploys BlockDagStorage[F].pooledDeploys .map(_.view.filterKeys(deploys.toSet).values.toSeq) @@ -93,7 +103,10 @@ final case class BlockCreator(id: ValidatorIdentity, shardId: String) { case None => BlockCreatorResult.noNewDeploys.pure case Some((postStateHash, processedDeploys, processedSystemDeploys)) => // Create block and calculate block hash - val state = RholangState(processedDeploys.toList, processedSystemDeploys.toList) + val state = RholangState( + processedDeploys.toList, + processedSystemDeploys.toList ++ preState.fringeDeploys + ) val view = View .compute[Validator, BlockMetadata](preState.justifications, _.sender, _.seqNum, _.view) diff --git a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala index d8029a0aace..f6665bf68f0 100644 --- a/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala +++ b/casper/src/main/scala/coop/rchain/casper/blocks/proposer/Proposer.scala @@ -8,6 +8,7 @@ import coop.rchain.blockstorage.BlockStore import coop.rchain.blockstorage.BlockStore.BlockStore import coop.rchain.blockstorage.dag.BlockDagStorage import coop.rchain.casper._ +import coop.rchain.casper.merging.MergeScope import coop.rchain.casper.protocol.{BlockMessage, CommUtil} import coop.rchain.casper.rholang.RuntimeManager import coop.rchain.casper.syntax._ @@ -154,8 +155,8 @@ object Proposer { preStateBonds <- RuntimeManager[F].computeBonds(preStateHash.toByteString) toSlash = offenders intersect preStateBonds.filter { case (_, b) => b > 0 }.keySet _ <- Log[F].info(s"Slashing senders: [${toSlash.map(_.show).mkString("; ")}]") - // epoch - changeEpoch = nextBlockNum % epochLength == 0 + // epoch change on each new fringe + changeEpoch = !preState.justifications.exists(_.fringe == preState.fringe) // attestation // no need to attest if nothing meaningful to finalize. dag <- BlockDagStorage[F].getRepresentation @@ -240,11 +241,16 @@ object Proposer { def checkValidatorIsActive(validator: ValidatorIdentity): F[Boolean] = for { - dag <- BlockDagStorage[F].getRepresentation - latestFringe = dag.dagMessageState.latestFringe + dag <- BlockDagStorage[F].getRepresentation + minGenJs = MergeScope.minGenJs(dag.dagMessageState.latestMsgs.map(_.id), dag) + bMaps = minGenJs.map(dag.dagMessageState.msgMap(_).bondsMap) + _ = assert( + bMaps.size == 1, + "Parallel valid blocks see different bonds maps, should not be possible" + ) // TODO: take bonds map from merged state of fringe // - it should also include consensus bonds map - bondsMap <- if (latestFringe.nonEmpty) latestFringe.head.bondsMap.pure[F] + bondsMap <- if (bMaps.nonEmpty) bMaps.head.pure[F] else BlockDagStorage[F].lookupUnsafe(dag.heightMap.head._2.head).map(_.bondsMap) sender = ByteString.copyFrom(validator.publicKey.bytes) } yield bondsMap.contains(sender) diff --git a/casper/src/main/scala/coop/rchain/casper/merging/BlockIndex.scala b/casper/src/main/scala/coop/rchain/casper/merging/BlockIndex.scala index 650c7650271..38f4b135fde 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/BlockIndex.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/BlockIndex.scala @@ -91,9 +91,9 @@ object BlockIndex { val mrgCount = mergeableChanData.size // Number of deploys must match the size of mergeable channels maps - assert(deployCount == mrgCount, { - s"Cache of mergeable channels ($mrgCount) doesn't match deploys count ($deployCount)." - }) +// assert(deployCount == mrgCount, { +// s"Cache of mergeable channels ($mrgCount) doesn't match deploys count ($deployCount)." +// }) // Connect deploy with corresponding mergeable channels map val (usrDeploys, sysDeploys) = mergeableChanData.toVector @@ -115,16 +115,16 @@ object BlockIndex { sysDeploysData = sysDeploys .collect { case (Succeeded(log, SlashSystemDeployData(_)), mergeChs) => - (blockHash.concat(SYS_SLASH_DEPLOY_ID), SYS_SLASH_DEPLOY_COST, log, mergeChs) + (SYS_SLASH_DEPLOY_ID concat blockHash, SYS_SLASH_DEPLOY_COST, log, mergeChs) case (Succeeded(log, CloseBlockSystemDeployData), mergeChs) => ( - blockHash.concat(SYS_CLOSE_BLOCK_DEPLOY_ID), + SYS_CLOSE_BLOCK_DEPLOY_ID concat blockHash, SYS_CLOSE_BLOCK_DEPLOY_COST, log, mergeChs ) case (Succeeded(log, Empty), mergeChs) => - (blockHash.concat(SYS_EMPTY_DEPLOY_ID), SYS_EMPTY_DEPLOY_COST, log, mergeChs) + (SYS_EMPTY_DEPLOY_ID concat blockHash, SYS_EMPTY_DEPLOY_COST, log, mergeChs) } sysDeployIndices <- sysDeploysData.traverse { case (sig, cost, log, mergeChs) => @@ -141,7 +141,7 @@ object BlockIndex { ) } - deployIndices = (usrDeployIndices ++ sysDeployIndices).toSet + deployIndices = (usrDeployIndices /* ++ sysDeployIndices*/ ).toSet /** Here deploys from a single block are examined. Atm deploys in block are executed sequentially, * so all conflicts are resolved according to order of sequential execution. diff --git a/casper/src/main/scala/coop/rchain/casper/merging/DeployIndex.scala b/casper/src/main/scala/coop/rchain/casper/merging/DeployIndex.scala index 37600059994..e1063065379 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/DeployIndex.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/DeployIndex.scala @@ -22,10 +22,12 @@ object DeployIndex { val SYS_SLASH_DEPLOY_COST = 0L val SYS_CLOSE_BLOCK_DEPLOY_COST = 0L val SYS_EMPTY_DEPLOY_COST = 0L + val SYS_NF_DEPLOY_COST = 0L // These are to be put in rejected set in blocks, so prefix format is defined for identification purposes. - val SYS_SLASH_DEPLOY_ID = ByteString.copyFrom(Array(1.toByte)) + val SYS_CLOSE_NF_DEPLOY_ID = ByteString.copyFrom(Array(1.toByte)) val SYS_CLOSE_BLOCK_DEPLOY_ID = ByteString.copyFrom(Array(2.toByte)) - val SYS_EMPTY_DEPLOY_ID = ByteString.copyFrom(Array(3.toByte)) + val SYS_SLASH_DEPLOY_ID = ByteString.copyFrom(Array(3.toByte)) + val SYS_EMPTY_DEPLOY_ID = ByteString.copyFrom(Array(4.toByte)) def apply[F[_]: Async]( sig: ByteString, diff --git a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala index db9f06d9952..6c136c07d87 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala @@ -110,7 +110,7 @@ object MergeScope { historyRepository: RhoHistoryRepository[F], blockIndex: BlockHash => F[BlockIndex], rejectionCost: DeployChainIndex => Long = DeployChainIndex.deployChainCost - ): F[(Blake2b256Hash, Set[ByteString])] = { + ): F[(Blake2b256Hash, Set[ByteString], Set[ByteString])] = { // if some indices can be computed - merge is impossible. val loadIndices = List(mergeScope.conflictScope, mergeScope.finalScope) .traverse(_.toList.traverse(blockIndex).map(_.toSet)) @@ -119,6 +119,10 @@ object MergeScope { case List(conflictScope, finalScope) => val conflictSet = conflictScope.flatMap(_.deployChains) val finalSet = finalScope.flatMap(_.deployChains) +// println(s"conflictScope ${conflictScope.map(_.deployChains.size)}") +// println(s"finalScope ${finalScope.map(_.deployChains.size)}") +// println(s"conflictSet ${conflictSet.size}") +// println(s"finalSet ${finalSet.size}") // finalization decisions made in final set val (rejectedFinally, acceptedFinally) = { val rejectionsMap = fringeStates.flatMap { case (k, v) => k.map((_, v.rejectedDeploys)) } @@ -157,10 +161,20 @@ object MergeScope { initMergeableValues = initMergeableValues ) } + import coop.rchain.models.syntax._ resolveConflicts.flatMap { case (toMerge, rejected) => +// println(s"toMerge ${toMerge.size} rejected ${rejected.size}") +// println(s"toMerge ${toMerge.toList +// .map(_.deploysWithCost.map(_.id).show)} rejected ${rejected.toList +// .map(_.deploysWithCost.map(_.id).show)}") + computeMergedState(toMerge, baseState, historyRepository).map { newState => - (newState, rejected.flatMap(_.deploysWithCost.map(_.id))) + ( + newState, + rejected.flatMap(_.deploysWithCost.map(_.id)), + toMerge.flatMap(_.deploysWithCost.map(_.id)) + ) } } } diff --git a/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala b/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala index a8a53ac13af..f9d9292c447 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/ParentsMergedState.scala @@ -1,6 +1,7 @@ package coop.rchain.casper.merging import com.google.protobuf.ByteString +import coop.rchain.casper.protocol.ProcessedSystemDeploy import coop.rchain.models.BlockHash.BlockHash import coop.rchain.models.BlockMetadata import coop.rchain.models.Validator.Validator @@ -31,5 +32,6 @@ final case class ParentsMergedState( fringeRejectedDeploys: Set[ByteString], // Conflict scope state (non-finalized blocks) preStateHash: Blake2b256Hash, - rejectedDeploys: Set[ByteString] + rejectedDeploys: Set[ByteString], + fringeDeploys: Seq[ProcessedSystemDeploy] ) diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala b/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala index 5d809eb2629..a8a556934e0 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/InterpreterUtil.scala @@ -12,6 +12,7 @@ import coop.rchain.casper.merging.ParentsMergedState import coop.rchain.casper.protocol.{ BlockMessage, DeployData, + NewFringeSystemDeployData, ProcessedDeploy, ProcessedSystemDeploy } @@ -32,6 +33,7 @@ import coop.rchain.rholang.interpreter.errors.InterpreterError import coop.rchain.shared.{Log, LogSource} import retry.{retryingOnFailures, RetryPolicies, Sleep} import cats.effect.Temporal +import coop.rchain.casper.rholang.sysdeploys.NewFringeDeploy import coop.rchain.sdk.dag.View import scala.concurrent.duration.FiniteDuration @@ -65,7 +67,10 @@ object InterpreterUtil { _ <- Span[F].mark("before-compute-parents-post-state") parentsSet = parents.toSet preState <- if (parentsSet.nonEmpty) - MultiParentCasper.getPreStateForParents(parents.toSet) + MultiParentCasper.getPreStateForParents( + parents.toSet, + block.state.systemDeploys.find(_.systemDeploy == NewFringeSystemDeployData) + ) else { // Genesis block val genesisPreStateHash = RuntimeManager.emptyStateHashFixed.toBlake2b256Hash @@ -82,7 +87,8 @@ object InterpreterUtil { maxSeqNums = Map[Validator, Long](block.sender -> 0L), // TODO: validate genesis post-state hash preStateHash = genesisPreStateHash, - rejectedDeploys = Set() + rejectedDeploys = Set(), + fringeDeploys = Seq() ).pure[F] } blockStr = PrettyPrinter.buildString(block, short = true) @@ -95,7 +101,14 @@ object InterpreterUtil { rejectedDeployIds = preState.fringeRejectedDeploys result <- { val incomingPreStateHash = block.preStateHash - if (incomingPreStateHash != computedPreStateHash) { + if (block.finStateHash != preState.fringeState.toByteString) { + Log[F] + .warn( + s"Computed final hash ${PrettyPrinter.buildString(preState.fringeState.toByteString)} " + + s"does not equal block's final hash ${PrettyPrinter.buildString(block.finStateHash)}" + ) + .as(false.asRight[InvalidBlock]) + } else if (incomingPreStateHash != computedPreStateHash) { //TODO at this point we may just as well terminate the replay, there's no way it will succeed. Log[F] .warn( @@ -149,8 +162,11 @@ object InterpreterUtil { } Span[F].trace(ReplayBlockMetricsSource) { - val internalDeploys = block.state.deploys - val internalSystemDeploys = block.state.systemDeploys + val internalDeploys = block.state.deploys + // NewFringeSystemDeployData should not be replayed here but when pre state is merged + val internalSystemDeploys = + block.state.systemDeploys.filterNot(_.systemDeploy == NewFringeSystemDeployData) + for { _ <- Span[F].mark("before-process-pre-state-hash") blockData = BlockData.fromBlock(block) diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala b/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala index bfae8303a9d..8f39e884cfb 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala @@ -196,7 +196,7 @@ final case class RuntimeManagerImpl[F[_]: Async: Metrics: Span: Log: Parallel: C def getActiveValidators(startHash: StateHash): F[Seq[Validator]] = spawnRuntime.flatMap(_.getActiveValidators(startHash)) - def computeBonds(hash: StateHash): F[Map[Validator, Long]] = { + def getBonds(hash: StateHash): F[Map[Validator, Long]] = { val f = spawnRuntime.flatMap { runtime => def logError(err: Throwable, details: RetryDetails): F[Unit] = details match { case WillDelayAndRetry(_, retriesSoFar: Int, _) => @@ -220,9 +220,17 @@ final case class RuntimeManagerImpl[F[_]: Async: Metrics: Span: Log: Parallel: C )(runtime.computeBonds(hash)) } - Cache[F].cached(s"bonds_${hash.show}", f) +// f.flatMap(b => Log[F].info(s"bonds for ${hash.show} are $b")) >> f.flatMap( +// b => Log[F].info(s"bonds for ${hash.show} are $b") +// ) >> f.flatMap(b => Log[F].info(s"bonds for ${hash.show} are $b")) >> + f //Cache[F].cached(s"bonds_${hash.show}", f) } + def computeBonds(hash: StateHash): F[Map[Validator, Long]] = + (getBonds(hash), getActiveValidators(hash)).mapN { + case (all, active) => all.view.filterKeys(active.toSet).toMap + } + // Executes deploy as user deploy with immediate rollback // - InterpreterError is rethrown def playExploratoryDeploy(term: String, hash: StateHash): F[Seq[Par]] = diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala index e8292986e35..3c78705c8a2 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala @@ -7,6 +7,7 @@ import coop.rchain.casper.CasperMetricsSource import coop.rchain.casper.protocol.{ CloseBlockSystemDeployData, Empty, + NewFringeSystemDeployData, ProcessedDeploy, ProcessedSystemDeploy, SlashSystemDeployData @@ -15,6 +16,7 @@ import coop.rchain.casper.rholang.InterpreterUtil.printDeployErrors import coop.rchain.casper.rholang.syntax.RuntimeSyntax.SysEvalResult import coop.rchain.casper.rholang.sysdeploys.{ CloseBlockDeploy, + NewFringeDeploy, PreChargeDeploy, RefundDeploy, SlashDeploy @@ -122,16 +124,23 @@ final class RuntimeReplayOps[F[_]](private val runtime: ReplayRhoRuntime[F]) ext } } } - val sysDeploys = (systemDeploys, Vector[NumberChannelsEndVal](), terms.length).tailRecM { - case (Seq(), mergeable, _) => - mergeable.asRight[ReplayFailure].asRight[Params[ProcessedSystemDeploy]].pure[F] - case (ts, mergeable, randIndex) => - Span[F].traceI("replay-sys-deploy") { - replaySystemDeploy(ts.head, rand.splitByte(randIndex.toByte)).map { a => - a.map(x => (ts.tail, mergeable :+ x, randIndex + 1)) - .swap - .map(_.asLeft[Vector[NumberChannelsEndVal]]) - } + val sysDeploys = { + if (systemDeploys.size == 1 && systemDeploys.head.systemDeploy == NewFringeSystemDeployData) { + val rnd = NewFringeDeploy.rand(startHash.toBlake2b256Hash) + replaySystemDeploy(systemDeploys.head, rnd).map(_.map(Vector(_))) + } else + (systemDeploys, Vector[NumberChannelsEndVal](), terms.length).tailRecM { + case (Seq(), mergeable, _) => + mergeable.asRight[ReplayFailure].asRight[Params[ProcessedSystemDeploy]].pure[F] + case (ts, mergeable, randIndex) => + Span[F].traceI("replay-sys-deploy") { + val rnd = rand.splitByte(randIndex.toByte) + replaySystemDeploy(ts.head, rnd).map { a => + a.map(x => (ts.tail, mergeable :+ x, randIndex + 1)) + .swap + .map(_.asLeft[Vector[NumberChannelsEndVal]]) + } + } } } val refT = Ref[F].of(Vector[NumberChannelsEndVal]()).liftEitherT[ReplayFailure] @@ -300,6 +309,17 @@ final class RuntimeReplayOps[F[_]](private val runtime: ReplayRhoRuntime[F]) ext runtime.getNumberChannelsData(er.mergeable).map((_, er)) } ).map(_._1) + case NewFringeSystemDeployData => + println(s"Replaying NewFringeSystemDeployData") + val newFringeDeploy = NewFringeDeploy(rand) + rigWithCheck( + processedSysDeploy, + replaySystemDeployInternal(newFringeDeploy, none).semiflatMap { + case (_, er) => + runtime.createSoftCheckpoint.whenA(er.succeeded) *> + runtime.getNumberChannelsData(er.mergeable).map((_, er)) + } + ).map(_._1) case Empty => EitherT.leftT(ReplayFailure.internalError(new Exception("Expected system deploy"))) } diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala index 984e4f4726c..724acc6c8a6 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala @@ -13,6 +13,7 @@ import coop.rchain.casper.rholang.RuntimeDeployResult._ import coop.rchain.casper.rholang.syntax.RuntimeSyntax._ import coop.rchain.casper.rholang.sysdeploys.{ CloseBlockDeploy, + NewFringeDeploy, PreChargeDeploy, RefundDeploy, SlashDeploy @@ -366,7 +367,16 @@ final class RuntimeOps[F[_]](private val runtime: RhoRuntime[F]) extends AnyVal .playSucceeded( finalStateHash, eventLog, - SystemDeployData.from(), + SystemDeployData.closeBlock(), + mcl, + result + ) + case NewFringeDeploy(_) => + SystemDeployResult + .playSucceeded( + finalStateHash, + eventLog, + SystemDeployData.newFringe(), mcl, result ) @@ -610,7 +620,10 @@ final class RuntimeOps[F[_]](private val runtime: RhoRuntime[F]) extends AnyVal private def toValidatorSeq(validatorsPar: Par): Seq[Validator] = validatorsPar.exprs.head.getESetBody.ps.map { validator => - assert(validator.exprs.length == 1, "Validator in bonds map wasn't a single string.") + assert( + validator.exprs.length == 1, + s"Validator in bonds map wasn't a single string. $validator" + ) validator.exprs.head.getGByteArray }.toList diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala new file mode 100644 index 00000000000..c0ce7ea0ac2 --- /dev/null +++ b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala @@ -0,0 +1,64 @@ +package coop.rchain.casper.rholang.sysdeploys + +import com.google.protobuf.ByteString +import coop.rchain.casper.rholang.BlockRandomSeed +import coop.rchain.casper.rholang.types.{SystemDeploy, SystemDeployUserError} +import coop.rchain.crypto.PublicKey +import coop.rchain.crypto.hash.Blake2b512Random +import coop.rchain.models.NormalizerEnv.{Contains, ToEnvMap} +import coop.rchain.models.rholang.RhoType._ +import coop.rchain.rspace.hashing.Blake2b256Hash + +// Currently we use parentHash as initial random seed +final case class NewFringeDeploy(initialRand: Blake2b512Random) extends SystemDeploy(initialRand) { + import coop.rchain.models._ + import rholang.{implicits => toPar} + import shapeless._ + + type Output = (RhoBoolean, Either[RhoString, RhoNil]) + type Result = Unit + + import toPar._ + type Env = + (`sys:casper:authToken` ->> GSysAuthToken) :: (`sys:casper:return` ->> GUnforgeable) :: HNil + protected override val envsReturnChannel = Contains[Env, `sys:casper:return`] + protected override val toEnvMap = ToEnvMap[Env] + + protected val normalizerEnv: NormalizerEnv[Env] = new NormalizerEnv( + mkSysAuthToken :: mkReturnChannel :: HNil + ) + + override val source: String = + """#new rl(`rho:registry:lookup`), + # poSCh, + # sysAuthToken(`sys:casper:authToken`), + # return(`sys:casper:return`), stdout(`rho:io:stdout`) + #in { + # rl!(`rho:rchain:pos`, *poSCh) | + # for(@(_, Pos) <- poSCh) { + # @Pos!("newFringe", *sysAuthToken, *return) + # } + #}""".stripMargin('#') + + protected override val extractor = Extractor.derive + + protected override def processResult( + value: (Boolean, Either[String, Unit]) + ): Either[SystemDeployUserError, Unit] = value match { + case (true, _) => Right(()) + case (false, Left(errorMsg)) => Left(SystemDeployUserError(errorMsg)) + case _ => Left(SystemDeployUserError("")) + } +} + +object NewFringeDeploy { + def apply(prevFringe: Blake2b256Hash): NewFringeDeploy = + NewFringeDeploy(rand(prevFringe)) + + def rand(prevFringeHash: Blake2b256Hash): Blake2b512Random = BlockRandomSeed.randomGenerator( + "shardId", + 0, + PublicKey.apply(ByteString.EMPTY), + prevFringeHash + ) +} diff --git a/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala b/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala index 15ce2dbbc41..92c22f49aae 100644 --- a/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala +++ b/casper/src/test/scala/coop/rchain/casper/helper/BlockGenerator.scala @@ -45,7 +45,8 @@ object BlockGenerator { fringeRejectedDeploys = Set(), // Pre-state is the same as fringe state preStateHash = RuntimeManager.emptyStateHashFixed.toBlake2b256Hash, - rejectedDeploys = Set() + rejectedDeploys = Set(), + List() ) def step[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log: Metrics: Span]( diff --git a/casper/src/test/scala/coop/rchain/casper/merging/MergingCases.scala b/casper/src/test/scala/coop/rchain/casper/merging/MergingCases.scala index 125ddd7f655..610664db8fd 100644 --- a/casper/src/test/scala/coop/rchain/casper/merging/MergingCases.scala +++ b/casper/src/test/scala/coop/rchain/casper/merging/MergingCases.scala @@ -2,11 +2,17 @@ package coop.rchain.casper.merging import cats.effect.{IO, Resource} import cats.syntax.all._ +import com.google.protobuf.ByteString +import coop.rchain.casper.Validate.PublicKey import coop.rchain.casper.genesis.Genesis -import coop.rchain.casper.rholang.sysdeploys.CloseBlockDeploy +import coop.rchain.casper.protocol.DeployData +import coop.rchain.casper.rholang.sysdeploys.{CloseBlockDeploy, NewFringeDeploy} import coop.rchain.casper.rholang.{BlockRandomSeed, Resources, RuntimeManager} import coop.rchain.casper.syntax._ import coop.rchain.casper.util.{ConstructDeploy, GenesisBuilder} +import coop.rchain.crypto.{PrivateKey, PublicKey} +import coop.rchain.crypto.signatures.Signed +import coop.rchain.models.block.StateHash.StateHash import coop.rchain.models.syntax.modelsSyntaxByteString import coop.rchain.p2p.EffectsTestInstances.LogicalTime import coop.rchain.rholang.interpreter.SystemProcesses.BlockData @@ -32,6 +38,189 @@ class MergingCases extends AnyFlatSpec with Matchers { rm <- Resource.eval(Resources.mkRuntimeManagerAt[IO](kvm, mergeableTag)) } yield rm + "bond and fringe" should "" in effectTest { + runtimeManagerResource.use { runtimeManager => + { + val baseState = genesis.postStateHash + val seqNum = 1L + val blockNum = 1L + + def mk( + payer: PrivateKey, + stateTransitionCreator: coop.rchain.crypto.PublicKey, + term: String + ): IO[StateHash] = + for { + userDeploy <- ConstructDeploy.sourceDeployNowF[IO](term, sec = payer) + blockData = BlockData( + blockNum, + stateTransitionCreator, + seqNum + ) + rand = BlockRandomSeed.randomGenerator( + genesis.shardId, + blockNum, + stateTransitionCreator, + genesis.postStateHash.toBlake2b256Hash + ) + systemDeploys = Seq( + CloseBlockDeploy(rand.splitByte(2.toByte)), + NewFringeDeploy(rand.splitByte(3.toByte)) + ) + r <- runtimeManager.computeState(baseState)( + Seq(userDeploy), + systemDeploys, + rand, + blockData + ) + (postStateHash, processedDeploys, _) = r + + blkSender = stateTransitionCreator.bytes + mergeableChs <- runtimeManager.loadMergeableChannels(postStateHash, blkSender, seqNum) + + // Combine processed deploys with cached mergeable channels data + processedDeploysWithMergeable = processedDeploys.toVector.zip(mergeableChs) + + idx <- processedDeploysWithMergeable.traverse { + case (d, mergeChs) => + BlockIndex.createEventLogIndex( + d.deployLog, + runtimeManager.getHistoryRepo, + baseState.toBlake2b256Hash, + mergeChs + ) + } + rand = BlockRandomSeed.randomGenerator( + "shardId", + 0, + PublicKey.apply(ByteString.EMPTY), + postStateHash.toBlake2b256Hash + ) + _ <- runtimeManager.computeState(postStateHash)( + Seq(), + Seq(NewFringeDeploy(postStateHash.toBlake2b256Hash)), + rand, + BlockData(0, PublicKey.apply(ByteString.EMPTY), 0) + ) + _ = assert(idx.size == 1) + } yield postStateHash + + val payer1Key = genesisContext.genesisVaults.head._1 + val payer1PubKey = genesisContext.genesisVaults.head._2 + val payer2Key = genesisContext.genesisVaults.tail.head._1 + val sender1 = genesisContext.validatorKeyPairs.head._2 + val sender2 = genesisContext.validatorKeyPairs.tail.head._2 + + val d = + """new PoSCh, rl(`rho:registry:lookup`), stdout(`rho:io:stdout`), deployerId(`rho:rchain:deployerId`), rCh in { + | stdout!("About to lookup pos contract.") | + | rl!(`rho:rchain:pos`, *PoSCh) | + | for(@(_, PoS) <- PoSCh) { + | stdout!("About to bond") | + | @PoS!("bond", *deployerId, 3000, *rCh) | + | for (@r <- rCh) { + | stdout!(r) + | } + | } + |}""".stripMargin + + mk(payer1Key, sender1, d).flatMap { x => + runtimeManager.computeBonds(x).replicateA(20).map { x => + x.toSet.size shouldBe 1 + x.head.exists(_._1.toByteArray sameElements payer1PubKey.bytes) + } + } + } + } + } + + "" should "" in effectTest { + runtimeManagerResource.use { runtimeManager => + { + val baseState = genesis.postStateHash + val seqNum = 1L + val blockNum = 1L + + def mk( + payer: PrivateKey, + stateTransitionCreator: coop.rchain.crypto.PublicKey, + term: String + ): IO[EventLogIndex] = + for { + userDeploy <- ConstructDeploy.sourceDeployNowF[IO](term, sec = payer) + blockData = BlockData( + blockNum, + stateTransitionCreator, + seqNum + ) + rand = BlockRandomSeed.randomGenerator( + genesis.shardId, + blockNum, + stateTransitionCreator, + genesis.postStateHash.toBlake2b256Hash + ) + systemDeploys = Seq( + CloseBlockDeploy(rand.splitByte(2.toByte)), + NewFringeDeploy(rand.splitByte(3.toByte)) + ) + r <- runtimeManager.computeState(baseState)( + Seq(userDeploy), + systemDeploys, + rand, + blockData + ) + (postStateHash, processedDeploys, _) = r + + blkSender = stateTransitionCreator.bytes + mergeableChs <- runtimeManager.loadMergeableChannels(postStateHash, blkSender, seqNum) + + // Combine processed deploys with cached mergeable channels data + processedDeploysWithMergeable = processedDeploys.toVector.zip(mergeableChs) + + idx <- processedDeploysWithMergeable.traverse { + case (d, mergeChs) => + BlockIndex.createEventLogIndex( + d.deployLog, + runtimeManager.getHistoryRepo, + baseState.toBlake2b256Hash, + mergeChs + ) + } + _ = assert(idx.size == 1) + } yield idx.head + + val payer1Key = genesisContext.genesisVaults.head._1 + val payer2Key = genesisContext.genesisVaults.tail.head._1 + val sender1 = genesisContext.validatorKeyPairs.head._2 + val sender2 = genesisContext.validatorKeyPairs.tail.head._2 + + val d = + """new PoSCh, rl(`rho:registry:lookup`), stdout(`rho:io:stdout`), deployerId(`rho:rchain:deployerId`), rCh in { + | stdout!("About to lookup pos contract.") | + | rl!(`rho:rchain:pos`, *PoSCh) | + | for(@(_, PoS) <- PoSCh) { + | stdout!("About to bond") | + | @PoS!("bond", *deployerId, 3000, *rCh) | + | for (@r <- rCh) { + | stdout!(r) + | } + | } + |}""".stripMargin + +// val d = """new stdout(`rho:io:stdout`) in {stdout!("sdf;ldkfnds")}""" + + ( + mk(payer1Key, sender1, d).flatTap(_ => println("\n\n").pure[IO]), + mk(payer2Key, sender2, d) + ).mapN { + case (l, r) => + val conflicts = EventLogMergingLogic.areConflicting(l, r) + println(conflicts) + } + } + } + } + /** * Two deploys inside single state transition are using the same PVV for precharge and refund. * So this should be dependent over produce that puts new value into PVV balance in the first deploy. diff --git a/models/src/main/protobuf/CasperMessage.proto b/models/src/main/protobuf/CasperMessage.proto index f3d2bdd8280..e36cdddacb9 100644 --- a/models/src/main/protobuf/CasperMessage.proto +++ b/models/src/main/protobuf/CasperMessage.proto @@ -148,10 +148,14 @@ message SlashSystemDeployDataProto { message CloseBlockSystemDeployDataProto{ } +message NewFringeSystemDeployDataProto{ +} + message SystemDeployDataProto{ oneof systemDeploy{ SlashSystemDeployDataProto slashSystemDeploy = 1; CloseBlockSystemDeployDataProto closeBlockSystemDeploy = 2; + NewFringeSystemDeployDataProto newFringeSystemDeploy = 3; } } diff --git a/models/src/main/protobuf/DeployServiceCommon.proto b/models/src/main/protobuf/DeployServiceCommon.proto index b48371738b0..ba9cdd3af75 100644 --- a/models/src/main/protobuf/DeployServiceCommon.proto +++ b/models/src/main/protobuf/DeployServiceCommon.proto @@ -127,6 +127,7 @@ message LightBlockInfo { string sender = 5; int64 seqNum = 6; string preStateHash = 7; + string finStateHash = 18; string postStateHash = 8; repeated string justifications = 9; repeated BondInfo bonds = 10; @@ -148,6 +149,7 @@ message LightBlockInfo { message BlockInfo { LightBlockInfo blockInfo = 1 [(scalapb.field).no_box = true]; repeated DeployInfo deploys = 2; + bool changedEpoch = 3; } message DataWithBlockInfo { diff --git a/models/src/main/scala/coop/rchain/casper/PrettyPrinter.scala b/models/src/main/scala/coop/rchain/casper/PrettyPrinter.scala index 15c93721d92..cac2258a93d 100644 --- a/models/src/main/scala/coop/rchain/casper/PrettyPrinter.scala +++ b/models/src/main/scala/coop/rchain/casper/PrettyPrinter.scala @@ -59,5 +59,5 @@ object PrettyPrinter { s"DeployData #${d.timestamp} -- ${d.term}" def buildString(hashes: Iterable[BlockHash]): String = - hashes.map(PrettyPrinter.buildString).mkString("[", " ", "]") + hashes.toList.map(PrettyPrinter.buildString).mkString("[", " ", "]") } diff --git a/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala b/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala index dccd78a0bee..825807b3ad4 100644 --- a/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala +++ b/models/src/main/scala/coop/rchain/casper/protocol/CasperMessage.scala @@ -212,14 +212,16 @@ object BlockMessage { .withSigAlgorithm(bm.sigAlgorithm) .withSig(bm.sig) .withView( - bm.view.seen.map { case (v, r) => ViewProto(v, r.start.toLong, r.end.toLong) }.toList + bm.view.seen.toList.sortBy(_._1).map { + case (v, r) => ViewProto(v, r.start.toLong, r.end.toLong) + } ) .withFringe(bm.fringe) .withFinStateHash(bm.finStateHash) .withMergeables(bm.mergeables.map { x => - DeploysChannelDiff(x.map { + DeploysChannelDiff(x.toList.sortBy(_._1).map { case (c, d) => ChannelDiffs.apply(c.toByteString, d) - }.toList) + }) }.toList) } @@ -301,6 +303,7 @@ sealed trait SystemDeployData final case class SlashSystemDeployData(slashedValidator: Validator) extends SystemDeployData case object CloseBlockSystemDeployData extends SystemDeployData +case object NewFringeSystemDeployData extends SystemDeployData case object Empty extends SystemDeployData object SystemDeployData { @@ -309,15 +312,20 @@ object SystemDeployData { def from(slashedValidator: Validator): SystemDeployData = SlashSystemDeployData(slashedValidator) - def from(): SystemDeployData = + def closeBlock(): SystemDeployData = CloseBlockSystemDeployData + def newFringe(): SystemDeployData = + NewFringeSystemDeployData + def fromProto(proto: SystemDeployDataProto): SystemDeployData = proto.systemDeploy match { case SystemDeployDataProto.SystemDeploy.SlashSystemDeploy(sd) => SlashSystemDeployData(sd.slashedValidator) case SystemDeployDataProto.SystemDeploy.CloseBlockSystemDeploy(_) => CloseBlockSystemDeployData + case SystemDeployDataProto.SystemDeploy.NewFringeSystemDeploy(_) => + NewFringeSystemDeployData case _ => Empty } @@ -331,6 +339,10 @@ object SystemDeployData { SystemDeployDataProto().withCloseBlockSystemDeploy( CloseBlockSystemDeployDataProto() ) + case NewFringeSystemDeployData => + SystemDeployDataProto().withNewFringeSystemDeploy( + NewFringeSystemDeployDataProto() + ) case Empty => SystemDeployDataProto() } } diff --git a/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala b/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala index 0f0d0fd0869..cc4f5ac047a 100644 --- a/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala +++ b/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala @@ -8,6 +8,7 @@ trait AdminWebApi[F[_]] { def propose: F[String] def proposeResult: F[String] def vDag(depth: Int, startBlockNumber: Int, showJs: Boolean): F[String] + def replay(hash: String): F[String] } object AdminWebApi { @@ -25,5 +26,8 @@ object AdminWebApi { .visualizeDag(depth, startBlockNumber, showJs) .flatMap(_.liftToBlockApiErr) .map(_.mkString) + + override def replay(hash: String): F[String] = + blockApi.replay(hash).flatMap(_.liftToBlockApiErr) } } diff --git a/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala b/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala index e47d0f67539..776819fc980 100644 --- a/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala +++ b/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala @@ -24,6 +24,15 @@ trait WebApiAdminEndpoints docs = EndpointDocs().withDescription("Render dag in DOT format".some) ) + val replay: Endpoint[String, String] = endpoint( + get(path / "replay" / blockHash), + ok(textResponse), + docs = EndpointDocs().withDescription("Render dag in DOT format".some) + ) + private lazy val dagDepth = segment[Int](name = "depth", docs = "Depth of the Dag to render".some) + + private lazy val blockHash = + segment[Int](name = "block hash", docs = "Hash of a block".some) } diff --git a/node/src/main/scala/coop/rchain/node/web/WebApiDocsV1.scala b/node/src/main/scala/coop/rchain/node/web/WebApiDocsV1.scala index dfaf7e87df7..ea4a9035cd4 100644 --- a/node/src/main/scala/coop/rchain/node/web/WebApiDocsV1.scala +++ b/node/src/main/scala/coop/rchain/node/web/WebApiDocsV1.scala @@ -30,7 +30,7 @@ object WebApiDocs getBlock ) - val admin = Seq(propose, vDag) + val admin = Seq(propose, vDag, replay) // Public API Open API schema val publicApi: OpenApi = openApi(Info(title = "RNode API", version = "1.0"))(public: _*) diff --git a/node/src/main/scala/coop/rchain/node/web/WebApiRoutesV1.scala b/node/src/main/scala/coop/rchain/node/web/WebApiRoutesV1.scala index e81a0a8968a..a111113313a 100644 --- a/node/src/main/scala/coop/rchain/node/web/WebApiRoutesV1.scala +++ b/node/src/main/scala/coop/rchain/node/web/WebApiRoutesV1.scala @@ -90,6 +90,7 @@ final case class AdminWebApiRoutesV1[F[_]: Concurrent]( val adminRoutes = routesFromEndpoints( // Propose propose.implementedByEffect(const(adminWebApi.propose)), - vDag.implementedByEffect(adminWebApi.vDag(_: Int, 0, showJs = true)) + vDag.implementedByEffect(adminWebApi.vDag(_: Int, 0, showJs = true)), + replay.implementedByEffect(adminWebApi.replay) ) } diff --git a/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala b/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala index 7577cdef883..92d2ead073e 100644 --- a/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala +++ b/sdk/src/main/scala/coop/rchain/sdk/dag/merging/ConflictResolutionLogic.scala @@ -234,12 +234,18 @@ object ConflictResolutionLogic { val conflictSetCompatible = conflictSet -- enforceRejected // conflict map accounting for dependencies val fullConflictsMap = - conflictsMap.view.mapValues(vs => vs ++ withDependencies(vs, dependencyMap)).toMap + conflictsMap.view + .filterKeys(conflictSetCompatible) + .mapValues( + vs => + vs ++ withDependencies(vs, dependencyMap.view.filterKeys(conflictSetCompatible).toMap) + ) + .toMap // find rejection combinations possible val rejectionOptions = computeRejectionOptions(fullConflictsMap) // add to rejection options rejections caused by mergeable channels overflow val mergeableOverflowRejectionOptions = addMergeableOverflowRejections( - conflictSet, + conflictSetCompatible, dependencyMap, rejectionOptions, initMergeableValues, @@ -247,6 +253,14 @@ object ConflictResolutionLogic { ) // find optimal rejection val resolved = computeOptimalRejection(mergeableOverflowRejectionOptions, cost) - (conflictSetCompatible -- resolved, resolved ++ enforceRejected) + val accept = conflictSetCompatible -- resolved + val reject = resolved ++ enforceRejected + + assert( + accept.size + reject.size == conflictSet.size, + s"accept ${accept.size} rj ${reject.size}, total ${conflictSet.size}" + ) + + (accept, reject) } } From ea368513c7201dcb25568aad9222747ded6c26f8 Mon Sep 17 00:00:00 2001 From: nutzipper <1746367+nzpr@users.noreply.github.com> Date: Wed, 29 May 2024 17:51:43 +0400 Subject: [PATCH 2/3] test --- casper/src/main/resources/Pos.rhox | 18 ++++++++++++++++++ .../coop/rchain/casper/api/BlockApi.scala | 2 +- .../coop/rchain/casper/api/BlockApiImpl.scala | 17 ++++++++++++----- .../rchain/casper/rholang/RuntimeManager.scala | 3 ++- .../casper/rholang/syntax/RuntimeSyntax.scala | 4 +++- .../rholang/sysdeploys/NewFringeDeploy.scala | 2 ++ .../coop/rchain/node/api/AdminWebApi.scala | 2 +- .../node/api/v1/WebApiAdminEndpoints.scala | 2 +- 8 files changed, 40 insertions(+), 10 deletions(-) diff --git a/casper/src/main/resources/Pos.rhox b/casper/src/main/resources/Pos.rhox index b1455c43944..5b523e9c8cb 100644 --- a/casper/src/main/resources/Pos.rhox +++ b/casper/src/main/resources/Pos.rhox @@ -521,30 +521,46 @@ in { commitCurrentEpochRewards, updatedRewardsCh, movePendingWithdrawCh, movePendingWithdrawer, addToBodsMap, newBondsMapCh in { + stdout!("newFringe") | PoS!("getCurrentEpochRewards", *currentEpochRewardCh) | for (@currentRewards <- currentEpochRewardCh) { + stdout!("currentEpochRewardCh") | + stdout!(currentRewards) | runMVar!(*stateCh, *stateProcessCh, *ackCh) | for (@state, stateUpdateCh <- stateProcessCh) { // 1. Calculate all the validator current epoch rewards based on active validator bonding proportion // 2. Accumulate `committedRewards` with the result of step 1 + stdout!("state, stateUpdateCh") | commitCurrentEpochRewards!(state, currentRewards, *updatedRewardsCh) | for (@newCommittedState <- updatedRewardsCh) { + stdout!(newCommittedState) | + stdout!("newCommittedState") | // 3. Update `withdrawers` according to `allBonds` and `pendingWithdrawers` // when pendingWithdrawers is not empty, remove the validator in `allBonds` movePendingWithdrawer!(newCommittedState, *movePendingWithdrawCh) | for (@newMovedPendingState <- movePendingWithdrawCh){ + stdout!("newMovedPendingState") | + stdout!(newMovedPendingState) | // 4. When the block number reaches the quantinue of `withdrawers` , // transfer the corresponding bonds+ rewards to the corresponding validator from posVault. // 5. update `pendingWithdrawers` to empty Map {}. removeQuarantinedWithdrawers!(0, newMovedPendingState, *removeQuarantinedCh) | for (@newRemovedQuantinueState <- removeQuarantinedCh) { // 6. Put requests for bonding into a bonds map TODO this is not really needed + stdout!("newRemovedQuantinueState") | + stdout!(newRemovedQuantinueState) | for (@bRequests <<- bStateCh) { + stdout!("bRequests") | + stdout!(bRequests) | addToBodsMap!(newRemovedQuantinueState, bRequests, *newBondsMapCh) | for (@newBondsMap <- newBondsMapCh) { // 7. Pick new `activeValidators` + stdout!("newBondsMap") | + stdout!(newBondsMap) | pickActiveValidators!(newBondsMap.get("allBonds").toList(), *newValidatorsCh) | for (@newValidators <- newValidatorsCh) { + stdout!("newValidators") | + stdout!(newValidators) | stateUpdateCh!(newBondsMap.set("activeValidators", newValidators), (true, Nil)) } } @@ -569,8 +585,10 @@ in { } | contract addToBodsMap(@state, @bondRequests, returnCh) = { new accumulate in { + stdout!("accumulate") | @ListOps!("fold", bondRequests.toList(), state, *accumulate, *returnCh) | contract accumulate(@(pk, bond), @s, resultCh) = { + stdout!("step") | if (s.get("allBonds").contains(pk)) { resultCh!(s) } else { diff --git a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala index 7c499529617..16e54e64403 100644 --- a/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala +++ b/casper/src/main/scala/coop/rchain/casper/api/BlockApi.scala @@ -75,7 +75,7 @@ trait BlockApi[F[_]] { def getLatestMessage: F[ApiErr[BlockMetadata]] - def replay(hash: String): F[ApiErr[Unit]] + def replay(hash: String): F[ApiErr[String]] } object BlockApi { diff --git a/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala b/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala index a5142a3bee7..d8682723ec1 100644 --- a/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala +++ b/casper/src/main/scala/coop/rchain/casper/api/BlockApiImpl.scala @@ -691,17 +691,24 @@ class BlockApiImpl[F[_]: Async: RuntimeManager: BlockDagStorage: BlockStore: Log .attempt .map(_.leftMap(_.getMessageSafe)) - override def replay(hash: String): F[ApiErr[Unit]] = + override def replay(hash: String): F[ApiErr[String]] = for { blockHash <- hash.hexToByteString.liftTo( new Exception(s"Invalid block hash base 16 encoding, $hash") ) - block <- BlockStore[F].getUnsafe(blockHash) - _ <- { + block <- BlockStore[F].get1(blockHash) + r <- { implicit val m: Metrics.MetricsNOP[F] = new Metrics.MetricsNOP() - InterpreterUtil.validateBlockCheckpoint(block) + block + .traverse( + InterpreterUtil + .validateBlockCheckpoint[F](_) + .map(_ => "OK") + ) + .map(_.toRight(s"No block $blockHash in block store")) + } - } yield () + } yield r private def getDataAtParRaw( par: Par, diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala b/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala index 8f39e884cfb..7aecfdf018f 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/RuntimeManager.scala @@ -228,7 +228,8 @@ final case class RuntimeManagerImpl[F[_]: Async: Metrics: Span: Log: Parallel: C def computeBonds(hash: StateHash): F[Map[Validator, Long]] = (getBonds(hash), getActiveValidators(hash)).mapN { - case (all, active) => all.view.filterKeys(active.toSet).toMap + case (all, active) => + all.view.filterKeys(active.toSet).toMap } // Executes deploy as user deploy with immediate rollback diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala index 724acc6c8a6..2e841f15248 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeSyntax.scala @@ -610,9 +610,11 @@ final class RuntimeOps[F[_]](private val runtime: RhoRuntime[F]) extends AnyVal private def bondsQuerySource: String = s""" - # new return, rl(`rho:registry:lookup`), poSCh in { + # new return, rl(`rho:registry:lookup`), poSCh, stdout(`rho:io:stdout`) in { + # stdout!("return") | # rl!(`rho:rchain:pos`, *poSCh) | # for(@(_, Pos) <- poSCh) { + # stdout!("poSCh") | # @Pos!("getBonds", *return) # } # } diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala index c0ce7ea0ac2..572d9b7f4e9 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/sysdeploys/NewFringeDeploy.scala @@ -34,8 +34,10 @@ final case class NewFringeDeploy(initialRand: Blake2b512Random) extends SystemDe # sysAuthToken(`sys:casper:authToken`), # return(`sys:casper:return`), stdout(`rho:io:stdout`) #in { + # stdout!("new Fringe query") | # rl!(`rho:rchain:pos`, *poSCh) | # for(@(_, Pos) <- poSCh) { + # stdout!("new Fringe Pos got") | # @Pos!("newFringe", *sysAuthToken, *return) # } #}""".stripMargin('#') diff --git a/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala b/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala index cc4f5ac047a..5863fad6a59 100644 --- a/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala +++ b/node/src/main/scala/coop/rchain/node/api/AdminWebApi.scala @@ -28,6 +28,6 @@ object AdminWebApi { .map(_.mkString) override def replay(hash: String): F[String] = - blockApi.replay(hash).flatMap(_.liftToBlockApiErr) + blockApi.replay(hash).flatMap(_.liftToBlockApiErr[F]) } } diff --git a/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala b/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala index 776819fc980..49152dbf544 100644 --- a/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala +++ b/node/src/main/scala/coop/rchain/node/api/v1/WebApiAdminEndpoints.scala @@ -34,5 +34,5 @@ trait WebApiAdminEndpoints segment[Int](name = "depth", docs = "Depth of the Dag to render".some) private lazy val blockHash = - segment[Int](name = "block hash", docs = "Hash of a block".some) + segment[String](name = "block hash", docs = "Hash of a block".some) } From 8a3298661d8e8244907a1ed48834c5e52456d6de Mon Sep 17 00:00:00 2001 From: nutzipper <1746367+nzpr@users.noreply.github.com> Date: Thu, 30 May 2024 13:32:29 +0400 Subject: [PATCH 3/3] WIP1 --- .../rchain/casper/MultiParentCasper.scala | 21 ++++++++++++++++++- .../casper/dag/BlockDagKeyValueStorage.scala | 17 +++++++++++++-- .../rchain/casper/merging/MergeScope.scala | 2 +- .../rholang/syntax/RuntimeReplaySyntax.scala | 7 ++++++- 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala index e8d0ff990ac..2ecf8f7db06 100644 --- a/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala +++ b/casper/src/main/scala/coop/rchain/casper/MultiParentCasper.scala @@ -85,6 +85,10 @@ object MultiParentCasper { prevFringeState = fringeRecord.stateHash prevFringeRejectedDeploys = fringeRecord.rejectedDeploys prevFringeStateHash = prevFringeState.toByteString + + _ = println( + s"prevFringeHashes ${prevFringeHashes.map(_.show.take(8))} prevFringeState: ${prevFringeState}" + ) // TODO: for empty fringe bonds map should be loaded from bonds file (if validated in replay) bondsMap <- if (prevFringe.isEmpty) justifications.head.bondsMap.pure[F] @@ -123,6 +127,20 @@ object MultiParentCasper { RuntimeManager[F].getHistoryRepo, BlockIndex.getBlockIndex[F](_) ) +// _ = println( +// s"baseOpt $baseOpt merging into ${baseStateOpt +// .getOrElse(prevFringeState)} (${prevFringeHashes})" +// ) +// _ = println(dag.fringeStates) +// _ = println(s"newFringe ${fringe.map(_.toHexString.take(8))}") +// _ = println(s"baseStateOpt $baseStateOpt") +// _ = println( +// s"finalScope ${mScope.finalScope.map(_.toHexString.take(8))}" +// ) +// _ = println( +// s"conflictScope ${mScope.conflictScope.map(_.toHexString.take(8))}" +// ) +// _ = println(s"merge $result") // this rand does not mean anything here since it is used to only compute deploys, // which are empty rand = BlockRandomSeed.randomGenerator( @@ -202,7 +220,8 @@ object MultiParentCasper { val rejectedDeploysStr = PrettyPrinter.buildString(rejected) val mergedDeploysStr = PrettyPrinter.buildString(merged) val msgFinalized = - s"New finalized fringe state: $finalizedStateStr, rejectedDeploys: $rejectedDeploysStr, merged: $mergedDeploysStr" + s"New finalized fringe state: $finalizedStateStr, old ${PrettyPrinter + .buildString(prevFringeStateHash)} rejectedDeploys: $rejectedDeploysStr, merged: $mergedDeploysStr" Log[F].info(msgFinalized) } } diff --git a/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala b/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala index 58eee36456a..d21a92e55da 100644 --- a/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala +++ b/casper/src/main/scala/coop/rchain/casper/dag/BlockDagKeyValueStorage.scala @@ -85,17 +85,30 @@ final class BlockDagKeyValueStorage[F[_]: Async: Log] private ( // fringeDiffMetas <- fringeDiffHashes.toList.traverse(blockMetadataIndex.getUnsafe) // fringeDiffMetasUpdated = fringeDiffMetas.map(_.copy(memberOfFringe = fringeHash.some)) // _ <- fringeDiffMetasUpdated.traverse(blockMetadataIndex.add) + metadataStateHash = blockMetadata.fringeStateHash.toBlake2b256Hash fringeData = FringeData( fringeHash, fringe = blockMetadata.fringe, //fringeDiff = fringeDiffHashes, - stateHash = blockMetadata.fringeStateHash.toBlake2b256Hash, + stateHash = metadataStateHash, rejectedDeploys = block.rejectedDeploys, rejectedBlocks = block.rejectedBlocks, rejectedSenders = block.rejectedSenders ) // Save to fringe data store - _ <- fringeDataStore.put(fringeHash, fringeData) + shouldSave <- fringeDataStore.get1(fringeHash).flatMap { + case Some(fd) => + FatalError( + s"Attempt do add block with equivocating fringe state hash. " + + s"Fringe: ${blockMetadata.fringe.map(_.toHexString.take(6))}, " + + s"persisted: ${fd.stateHash}, " + + s"attempting to add: $metadataStateHash." + ).raiseError + .whenA(fd.stateHash != metadataStateHash) + .as(false) + case None => true.pure + } + _ <- fringeDataStore.put(fringeHash, fringeData).whenA(shouldSave) // Update in-mem indices dag <- representationState.get diff --git a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala index 6c136c07d87..39a5d4469a1 100644 --- a/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala +++ b/casper/src/main/scala/coop/rchain/casper/merging/MergeScope.scala @@ -219,7 +219,7 @@ object MergeScope { overallChanges = s"${allChanges.datumsChanges.size} D, ${allChanges.kontChanges.size} K, ${allChanges.consumeChannelsToJoinSerializedMap.size} J" logStr = s"Merging done. Changes: $overallChanges; " + s"trie actions (${trieActions.size}) computed in ${computeActionsTime}; " + - s"actions applied in ${applyActionsTime}" + s"actions applied in ${applyActionsTime}. ${baseState} => ${newState}" _ <- Log[F].debug(logStr) } yield newState } diff --git a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala index 3c78705c8a2..dffcb374c1a 100644 --- a/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala +++ b/casper/src/main/scala/coop/rchain/casper/rholang/syntax/RuntimeReplaySyntax.scala @@ -127,6 +127,9 @@ final class RuntimeReplayOps[F[_]](private val runtime: ReplayRhoRuntime[F]) ext val sysDeploys = { if (systemDeploys.size == 1 && systemDeploys.head.systemDeploy == NewFringeSystemDeployData) { val rnd = NewFringeDeploy.rand(startHash.toBlake2b256Hash) + println( + s"Replaying rand ${(Blake2b256Hash.fromByteArray(rnd.copy().next()))}" + ) replaySystemDeploy(systemDeploys.head, rnd).map(_.map(Vector(_))) } else (systemDeploys, Vector[NumberChannelsEndVal](), terms.length).tailRecM { @@ -310,7 +313,9 @@ final class RuntimeReplayOps[F[_]](private val runtime: ReplayRhoRuntime[F]) ext } ).map(_._1) case NewFringeSystemDeployData => - println(s"Replaying NewFringeSystemDeployData") + println( + s"Replaying NewFringeSystemDeployData rand ${(Blake2b256Hash.fromByteArray(rand.copy().next()))}" + ) val newFringeDeploy = NewFringeDeploy(rand) rigWithCheck( processedSysDeploy,