diff --git a/.github/actions/nix-cachix-setup/action.yml b/.github/actions/nix-cachix-setup/action.yml index fcf19333bb5..6c24c24a284 100644 --- a/.github/actions/nix-cachix-setup/action.yml +++ b/.github/actions/nix-cachix-setup/action.yml @@ -9,6 +9,21 @@ runs: using: composite steps: + - name: ๐Ÿงน Free disk space + if: runner.os == 'Linux' + shell: bash + run: | + echo "Disk space before cleanup:" + df -h / + # Remove unnecessary tools to free up disk space + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + sudo docker image prune --all --force || true + echo "Disk space after cleanup:" + df -h / + - name: โ„ Prepare nix uses: cachix/install-nix-action@v30 with: diff --git a/.github/workflows/ci-nix.yaml b/.github/workflows/ci-nix.yaml index 8477afbd02d..d3843fa4815 100644 --- a/.github/workflows/ci-nix.yaml +++ b/.github/workflows/ci-nix.yaml @@ -11,7 +11,11 @@ on: branches: - master - release + - v1.2.0-base pull_request: + branches: + - master + - v1.2.0-base schedule: # Everyday at 4:00 AM - cron: "0 4 * * *" diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index e0021638606..f9262c0609f 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -11,8 +11,10 @@ concurrency: on: push: - branches: [ "master" ] + branches: [ "master", "v1.2.0-base" ] tags: [ "*.*.*" ] + pull_request: + branches: [ "master", "v1.2.0-base" ] workflow_dispatch: inputs: ref_name: @@ -73,6 +75,12 @@ jobs: # And the version as the git commit. VERSION=${{github.sha}} + # For PRs, tag as pr- + if [[ "${{github.event_name}}" == "pull_request" ]]; then + IMAGE_LABEL=pr-${{github.event.pull_request.number}} + VERSION=${{github.event.pull_request.head.sha}} + fi + # Determine whether we are building a tag and if yes, set the label # name to be the tag name, and the version to be the tag. BUILDING_TAG=${{github.ref_type == 'tag'}} diff --git a/hydra-node/src/Hydra/HeadLogic.hs b/hydra-node/src/Hydra/HeadLogic.hs index ab4c088c702..d6487d5175b 100644 --- a/hydra-node/src/Hydra/HeadLogic.hs +++ b/hydra-node/src/Hydra/HeadLogic.hs @@ -24,6 +24,8 @@ import Hydra.Prelude import Data.List (elemIndex, minimumBy) import Data.Map.Strict qualified as Map +import Data.Sequence (Seq (Empty), (|>)) +import Data.Sequence qualified as Seq import Data.Set ((\\)) import Data.Set qualified as Set import Hydra.API.ClientInput (ClientInput (..)) @@ -73,7 +75,6 @@ import Hydra.HeadLogic.State ( ) import Hydra.Ledger ( Ledger (..), - applyTransactions, ) import Hydra.Network qualified as Network import Hydra.Network.Message (Message (..), NetworkEvent (..)) @@ -342,7 +343,7 @@ onOpenNetworkReqTx env ledger currentSlot st ttl tx = -- spec. Do we really need to store that we have -- requested a snapshot? If yes, should update spec. <> newState SnapshotRequestDecided{snapshotNumber = nextSn} - <> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs') decommitTx currentDepositTxId) + <> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs') decommitTx currentDepositTxId) else outcome Environment{party} = env @@ -371,7 +372,7 @@ onOpenNetworkReqTx env ledger currentSlot st ttl tx = -- NOTE: Order of transactions is important here. See also -- 'pruneTransactions'. - localTxs' = localTxs <> [tx] + localTxs' = localTxs |> tx -- | Process a snapshot request ('ReqSn') from party. -- @@ -543,21 +544,18 @@ onOpenNetworkReqSn env ledger pendingDeposits currentSlot st otherParty sv sn re Error $ RequireFailed $ SnapshotDoesNotApply sn (txId tx) err Right u -> cont u - pruneTransactions utxo = do - -- NOTE: Using foldl' is important to apply transacations in the correct + pruneTransactions utxo = + -- NOTE: Using collectTransactions applies transactions in the correct -- order. That is, left-associative as new transactions are first validated -- and then appended to `localTxs` (when aggregating -- 'TransactionAppliedToLocalUTxO'). - foldl' go ([], utxo) localTxs - where - go (txs, u) tx = - -- XXX: We prune transactions on any error, while only some of them are - -- actually expected. - -- For example: `OutsideValidityIntervalUTxO` ledger errors are expected - -- here when a tx becomes invalid. - case applyTransactions ledger currentSlot u [tx] of - Left (_, _) -> (txs, u) - Right u' -> (txs <> [tx], u') + -- XXX: We prune transactions on any error, while only some of them are + -- actually expected. + -- For example: `OutsideValidityIntervalUTxO` ledger errors are expected + -- here when a tx becomes invalid. + -- OPTIMIZATION: Uses Ledger's collectTransactions which works directly with + -- Seq to avoid list conversions. Also uses O(1) Seq append internally. + collectTransactions ledger currentSlot utxo localTxs confSn = case confirmedSnapshot of InitialSnapshot{} -> 0 @@ -690,13 +688,30 @@ onOpenNetworkAckSn Environment{party} pendingDeposits openState otherParty snaps maybeRequestNextSnapshot previous outcome = do let nextSn = previous.number + 1 - if isLeader parameters party nextSn && not (null localTxs) + -- Check for active deposits that can be picked up (only if no deposit already in progress) + nextActiveDeposit = + if isNothing currentDepositTxId && isNothing decommitTx + then getNextActiveDeposit pendingDeposits + else Nothing + -- Use current deposit if in progress, otherwise use next active deposit + depositToInclude = currentDepositTxId <|> nextActiveDeposit + -- Request snapshot if we have pending txs OR a deposit to process + shouldRequest = isLeader parameters party nextSn && (not (Seq.null localTxs) || isJust depositToInclude) + if shouldRequest then outcome <> newState SnapshotRequestDecided{snapshotNumber = nextSn} - <> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) decommitTx currentDepositTxId) + <> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs) decommitTx depositToInclude) else outcome + -- \| Get the next active deposit to include in a snapshot request (FIFO by creation time) + getNextActiveDeposit :: (Eq (UTxOType tx), Monoid (UTxOType tx)) => Map (TxIdType tx) (Deposit tx) -> Maybe (TxIdType tx) + getNextActiveDeposit deposits = + let isActive (_, Deposit{deposited, status}) = deposited /= mempty && status == Active + in case filter isActive (Map.toList deposits) of + [] -> Nothing + xs -> Just $ fst $ minimumBy (comparing ((\Deposit{created} -> created) . snd)) xs + maybePostIncrementTx snapshot@Snapshot{utxoToCommit} signatures outcome = -- TODO: check status (again)? case find (\(_, Deposit{deposited}) -> Just deposited == utxoToCommit) $ Map.toList pendingDeposits of @@ -890,7 +905,7 @@ onOpenNetworkReqDec env ledger ttl currentSlot openState decommitTx = maybeRequestSnapshot = if not snapshotInFlight && isLeader parameters party nextSn - then cause (NetworkEffect (ReqSn version nextSn (txId <$> localTxs) (Just decommitTx) Nothing)) + then cause (NetworkEffect (ReqSn version nextSn (toList $ txId <$> localTxs) (Just decommitTx) Nothing)) else noop Environment{party} = env @@ -992,7 +1007,7 @@ onOpenChainTick env chainTime pendingDeposits st = -- requested a snapshot? If yes, should update spec. newState SnapshotRequestDecided{snapshotNumber = nextSn} -- Spec: multicast (reqSn,ฬ‚ ๐‘ฃ,ฬ„ ๐’ฎ.๐‘  + 1,ฬ‚ ๐’ฏ, ๐‘ˆ๐›ผ, โŠฅ) - <> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) Nothing (Just depositTxId)) + <> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs) Nothing (Just depositTxId)) else noop where @@ -1498,6 +1513,13 @@ aggregateNodeState nodeState sc = ns{currentSlot = chainSlot} ChainRolledBack{chainState} -> ns{currentSlot = chainStateSlot chainState} + -- Restore full NodeState from checkpoint, including pendingDeposits + Checkpoint NodeState{headState = checkpointHeadState, pendingDeposits = checkpointDeposits, currentSlot = checkpointSlot} -> + NodeState + { headState = checkpointHeadState + , pendingDeposits = checkpointDeposits + , currentSlot = checkpointSlot + } _ -> ns -- * HeadState aggregate @@ -1552,7 +1574,7 @@ aggregate st = \case CoordinatedHeadState { localUTxO = initialUTxO , allTxs = mempty - , localTxs = mempty + , localTxs = Empty , confirmedSnapshot = InitialSnapshot{headId, initialUTxO} , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -1587,7 +1609,7 @@ aggregate st = \case { localUTxO = newLocalUTxO , -- NOTE: Order of transactions is important here. See also -- 'pruneTransactions'. - localTxs = localTxs <> [tx] + localTxs = localTxs |> tx } } where @@ -1675,14 +1697,14 @@ aggregate st = \case InitialSnapshot{initialUTxO} -> coordinatedHeadState { localUTxO = initialUTxO - , localTxs = mempty + , localTxs = Empty , allTxs = mempty , seenSnapshot = NoSeenSnapshot } ConfirmedSnapshot{snapshot = Snapshot{utxo}} -> coordinatedHeadState { localUTxO = utxo - , localTxs = mempty + , localTxs = Empty , allTxs = mempty , seenSnapshot = LastSeenSnapshot snapshotNumber } diff --git a/hydra-node/src/Hydra/HeadLogic/Input.hs b/hydra-node/src/Hydra/HeadLogic/Input.hs index 14291998e57..f212af24c16 100644 --- a/hydra-node/src/Hydra/HeadLogic/Input.hs +++ b/hydra-node/src/Hydra/HeadLogic/Input.hs @@ -1,17 +1,46 @@ {-# LANGUAGE UndecidableInstances #-} -module Hydra.HeadLogic.Input where +module Hydra.HeadLogic.Input ( + Input (..), + TTL, + MessagePriority (..), + inputPriority, +) where import Hydra.Prelude import Hydra.API.ClientInput (ClientInput) import Hydra.Chain (ChainEvent) import Hydra.Chain.ChainState (IsChainState) -import Hydra.Network.Message (Message, NetworkEvent) +import Hydra.Network.Message (Message (..), NetworkEvent (..)) import Hydra.Tx.IsTx (ArbitraryIsTx) type TTL = Natural +-- | Priority level for input messages. Protocol messages (ReqSn, AckSn) get +-- high priority to prevent them from being delayed by transaction messages +-- under high load. +data MessagePriority = HighPriority | LowPriority + deriving stock (Eq, Show, Generic) + +-- | Classify an input by its priority. Protocol messages that are critical +-- for snapshot progress get high priority, while transaction submissions +-- get low priority. +inputPriority :: Input tx -> MessagePriority +inputPriority = \case + -- Protocol messages: high priority to ensure snapshot progress + NetworkInput{networkEvent = ReceivedMessage{msg = ReqSn{}}} -> HighPriority + NetworkInput{networkEvent = ReceivedMessage{msg = AckSn{}}} -> HighPriority + -- Connectivity events: high priority for protocol health + NetworkInput{networkEvent = ConnectivityEvent{}} -> HighPriority + -- Transaction requests: low priority (can be delayed under load) + NetworkInput{networkEvent = ReceivedMessage{msg = ReqTx{}}} -> LowPriority + NetworkInput{networkEvent = ReceivedMessage{msg = ReqDec{}}} -> LowPriority + -- Client inputs: high priority (user-initiated actions) + ClientInput{} -> HighPriority + -- Chain events: high priority (must be processed promptly) + ChainInput{} -> HighPriority + -- | Inputs that are processed by the head logic (the "core"). Corresponding to -- each of the "shell" layers, we distinguish between inputs from the client, -- the network and the chain. diff --git a/hydra-node/src/Hydra/HeadLogic/Outcome.hs b/hydra-node/src/Hydra/HeadLogic/Outcome.hs index b70cf5e0dd4..64a5fb20aaa 100644 --- a/hydra-node/src/Hydra/HeadLogic/Outcome.hs +++ b/hydra-node/src/Hydra/HeadLogic/Outcome.hs @@ -94,7 +94,7 @@ data StateChanged tx { snapshot :: Snapshot tx , requestedTxIds :: [TxIdType tx] , newLocalUTxO :: UTxOType tx - , newLocalTxs :: [tx] + , newLocalTxs :: Seq tx , newCurrentDepositTxId :: Maybe (TxIdType tx) } | PartySignedSnapshot {snapshot :: Snapshot tx, party :: Party, signature :: Signature (Snapshot tx)} diff --git a/hydra-node/src/Hydra/HeadLogic/State.hs b/hydra-node/src/Hydra/HeadLogic/State.hs index b44291ca701..4c763ac7118 100644 --- a/hydra-node/src/Hydra/HeadLogic/State.hs +++ b/hydra-node/src/Hydra/HeadLogic/State.hs @@ -162,9 +162,10 @@ data CoordinatedHeadState tx = CoordinatedHeadState { localUTxO :: UTxOType tx -- ^ The latest UTxO resulting from applying 'localTxs' to -- 'confirmedSnapshot'. Spec: Lฬ‚ - , localTxs :: [tx] - -- ^ List of transactions applied locally and pending inclusion in a snapshot. - -- Ordering in this list is important as transactions are added in order of + , localTxs :: Seq tx + -- ^ Sequence of transactions applied locally and pending inclusion in a snapshot. + -- Uses Seq for O(1) append (snoc) instead of O(n) list append. + -- Ordering in this sequence is important as transactions are added in order of -- application. Spec: Tฬ‚ , allTxs :: !(Map.Map (TxIdType tx) tx) -- ^ Map containing all the transactions ever seen by this node and not yet diff --git a/hydra-node/src/Hydra/Ledger.hs b/hydra-node/src/Hydra/Ledger.hs index 45877c4372e..8c22810f023 100644 --- a/hydra-node/src/Hydra/Ledger.hs +++ b/hydra-node/src/Hydra/Ledger.hs @@ -5,6 +5,7 @@ module Hydra.Ledger where import Hydra.Prelude +import Data.Sequence (Seq (Empty), (|>)) import Hydra.Chain.ChainState (ChainSlot (..)) import Hydra.Tx.IsTx (IsTx (..)) import Test.QuickCheck.Instances.Natural () @@ -18,7 +19,7 @@ nextChainSlot (ChainSlot n) = ChainSlot (n + 1) -- | An abstract interface for a 'Ledger'. Allows to define mock / simpler -- implementation for testing as well as limiting feature-envy from the business -- logic by forcing a closed interface. -newtype Ledger tx = Ledger +data Ledger tx = Ledger { applyTransactions :: ChainSlot -> UTxOType tx -> @@ -28,18 +29,33 @@ newtype Ledger tx = Ledger -- validation failures returned from the ledger. -- TODO: 'ValidationError' should also include the UTxO, which is not -- necessarily the same as the given UTxO after some transactions + , collectTransactions :: + ChainSlot -> + UTxOType tx -> + Seq tx -> + (Seq tx, UTxOType tx) + -- ^ Collect applicable transactions and resulting UTxO. Unlike 'applyTransactions', + -- this function continues on validation errors, returning only the valid transactions. + -- OPTIMIZATION: Uses Seq for O(1) append. Implementations can optimize this to avoid + -- repeated UTxO format conversions when validating transactions one by one. } --- | Collect applicable transactions and resulting UTxO. In contrast to --- 'applyTransactions', this functions continues on validation errors. -collectTransactions :: Ledger tx -> ChainSlot -> UTxOType tx -> [tx] -> ([tx], UTxOType tx) -collectTransactions Ledger{applyTransactions} slot utxo = - foldr go ([], utxo) +-- | Default implementation of 'collectTransactions' using 'applyTransactions'. +-- This is less efficient than specialized implementations as it may perform +-- repeated format conversions. +defaultCollectTransactions :: + (ChainSlot -> UTxOType tx -> [tx] -> Either (tx, ValidationError) (UTxOType tx)) -> + ChainSlot -> + UTxOType tx -> + Seq tx -> + (Seq tx, UTxOType tx) +defaultCollectTransactions applyTxs slot utxo = + foldl' go (Empty, utxo) where - go tx (applicableTxs, u) = - case applyTransactions slot u [tx] of + go (applicableTxs, u) tx = + case applyTxs slot u [tx] of Left _ -> (applicableTxs, u) - Right u' -> (applicableTxs <> [tx], u') + Right u' -> (applicableTxs |> tx, u') -- | Either valid or an error which we get from the ledger-specs tx validation. data ValidationResult diff --git a/hydra-node/src/Hydra/Ledger/Cardano.hs b/hydra-node/src/Hydra/Ledger/Cardano.hs index e1fd7dedb5d..eedf7b2ae16 100644 --- a/hydra-node/src/Hydra/Ledger/Cardano.hs +++ b/hydra-node/src/Hydra/Ledger/Cardano.hs @@ -39,6 +39,7 @@ import Control.Monad (foldM) import Data.ByteString qualified as BS import Data.Default (def) import Data.Map qualified as Map +import Data.Sequence (Seq (Empty), (|>)) import Data.Set qualified as Set import Hydra.Chain.ChainState (ChainSlot (..)) import Hydra.Ledger (Ledger (..), ValidationError (..)) @@ -58,53 +59,57 @@ import Test.QuickCheck ( -- | Use the cardano-ledger as an in-hydra 'Ledger'. cardanoLedger :: Ledger.Globals -> Ledger.LedgerEnv LedgerEra -> Ledger Tx cardanoLedger globals ledgerEnv = - Ledger{applyTransactions} + Ledger{applyTransactions, collectTransactions} where - -- NOTE(SN): See full note on 'applyTx' why we only have a single transaction - -- application here. - applyTransactions slot utxo = \case + -- OPTIMIZATION: Convert to Shelley format once at the beginning and back once at the end. + -- This reduces conversion overhead from O(n * txCount) to O(n) for batch validation. + applyTransactions slot utxo txs = case txs of [] -> Right utxo - (tx : txs) -> do - utxo' <- applyTx slot utxo tx - applyTransactions slot utxo' txs + _ -> + let shelleyUtxo = UTxO.toShelleyUTxO shelleyBasedEra utxo + in case applyTxsShelley slot shelleyUtxo txs of + Left err -> Left err + Right shelleyUtxo' -> Right $ UTxO.fromShelleyUTxO shelleyBasedEra shelleyUtxo' - -- TODO(SN): Pre-validate transactions to get less confusing errors on - -- transactions which are not expected to work on a layer-2 - -- NOTE(SN): This is will fail on any transaction requiring the 'DPState' to be - -- in a certain state as we do throw away the resulting 'DPState' and only take - -- the ledger's 'UTxO' forward. - -- - -- We came to this signature of only applying a single transaction because we - -- got confused why a sequence of transactions worked but sequentially applying - -- single transactions didn't. This was because of this not-keeping the'DPState' - -- as described above. - applyTx (ChainSlot slot) utxo tx = + -- OPTIMIZATION: Collect applicable transactions while keeping UTxO in Shelley format + -- throughout. This avoids N conversions when validating N transactions one by one. + -- Instead, we convert once at the start and once at the end. + -- Uses Seq for O(1) append instead of O(n) list append. + collectTransactions slot utxo txs = case txs of + Empty -> (Empty, utxo) + _ -> + let shelleyUtxo = UTxO.toShelleyUTxO shelleyBasedEra utxo + (validTxs, finalShelleyUtxo) = collectTxsShelley slot shelleyUtxo txs + in (validTxs, UTxO.fromShelleyUTxO shelleyBasedEra finalShelleyUtxo) + + -- Collect valid transactions, keeping UTxO in Shelley format throughout. + -- Uses Seq for O(1) append. + collectTxsShelley slot shelleyUtxo = foldl' go (Empty, shelleyUtxo) + where + go (validTxs, u) tx = + case applyTxShelley slot u tx of + Left _ -> (validTxs, u) + Right u' -> (validTxs |> tx, u') + + -- Apply transactions keeping UTxO in Shelley format throughout + applyTxsShelley _ shelleyUtxo [] = Right shelleyUtxo + applyTxsShelley slot shelleyUtxo (tx : txs) = do + shelleyUtxo' <- applyTxShelley slot shelleyUtxo tx + applyTxsShelley slot shelleyUtxo' txs + + -- Apply a single transaction with UTxO already in Shelley format + applyTxShelley (ChainSlot slot) shelleyUtxo tx = case Ledger.applyTx globals env' memPoolState (toLedgerTx tx) of Left err -> Left (tx, toValidationError err) Right (Ledger.LedgerState{Ledger.lsUTxOState = us}, _validatedTx) -> - Right . UTxO.fromShelleyUTxO shelleyBasedEra $ Ledger.utxosUtxo us + Right $ Ledger.utxosUtxo us where - -- As we use applyTx we only expect one ledger rule to run and one tx to - -- fail validation, hence using the heads of non empty lists is fine. - toValidationError :: Ledger.ApplyTxError LedgerEra -> ValidationError - toValidationError (Ledger.ApplyTxError (e :| _)) = case e of - (ConwayUtxowFailure (UtxoFailure (UtxosFailure (ValidationTagMismatch _ (FailedUnexpectedly (PlutusFailure msg ctx :| _)))))) -> - ValidationError $ - "Plutus validation failed: " - <> msg - <> "Debug info: " - -- NOTE: There is not a clear reason why 'debugPlutus' is an IO - -- action. It only re-evaluates the script and does not have any - -- side-effects. - <> show (unsafeDupablePerformIO $ debugPlutus (decodeUtf8 ctx) $ PlutusDebugOverrides Nothing Nothing Nothing Nothing Nothing Nothing) - _ -> ValidationError $ show e - env' = ledgerEnv{Ledger.ledgerSlotNo = fromIntegral slot} memPoolState = def - & Ledger.lsUTxOStateL . Ledger.utxoL .~ UTxO.toShelleyUTxO shelleyBasedEra utxo + & Ledger.lsUTxOStateL . Ledger.utxoL .~ shelleyUtxo & Ledger.lsCertStateL . Ledger.certDStateL %~ mockCertState -- NOTE: Mocked certificate state that simulates any reward accounts for any @@ -120,6 +125,20 @@ cardanoLedger globals ledgerEnv = & Map.keysSet & Set.map raCredential + -- As we use applyTx we only expect one ledger rule to run and one tx to + -- fail validation, hence using the heads of non empty lists is fine. + toValidationError (Ledger.ApplyTxError (e :| _)) = case e of + (ConwayUtxowFailure (UtxoFailure (UtxosFailure (ValidationTagMismatch _ (FailedUnexpectedly (PlutusFailure msg ctx :| _)))))) -> + ValidationError $ + "Plutus validation failed: " + <> msg + <> "Debug info: " + -- NOTE: There is not a clear reason why 'debugPlutus' is an IO + -- action. It only re-evaluates the script and does not have any + -- side-effects. + <> show (unsafeDupablePerformIO $ debugPlutus (decodeUtf8 ctx) $ PlutusDebugOverrides Nothing Nothing Nothing Nothing Nothing Nothing) + _ -> ValidationError $ show e + -- * LedgerEnv -- | Create a new ledger env from given protocol parameters. @@ -236,12 +255,16 @@ mkRangedTx (txin, TxOut owner valueIn datum refScript) (recipient, valueOut) sk -- the outputs added, correctly indexed by the `TxIn`. This function is useful -- to manually maintain a `UTxO` set without caring too much about the `Ledger` -- rules. +-- +-- OPTIMIZATION: Uses Map.withoutKeys for O(m log n) instead of O(n log n) +-- where m = consumed inputs, n = UTxO set size. adjustUTxO :: Tx -> UTxO -> UTxO adjustUTxO tx utxo = let txid = Hydra.Tx.txId tx - consumed = txIns' tx + consumed = Set.fromList $ txIns' tx produced = UTxO.fromList ((\(txout, ix) -> (TxIn txid (TxIx ix), toCtxUTxOTxOut txout)) <$> zip (txOuts' tx) [0 ..]) - utxo' = UTxO.fromList $ filter (\(txin, _) -> txin `notElem` consumed) $ UTxO.toList utxo + -- Use withoutKeys for efficient removal: O(m log n) instead of O(n log n) + utxo' = UTxO (Map.withoutKeys (UTxO.toMap utxo) consumed) in utxo' <> produced -- * Generators diff --git a/hydra-node/src/Hydra/Ledger/Simple.hs b/hydra-node/src/Hydra/Ledger/Simple.hs index 0d0584ff8c2..547f74bef37 100644 --- a/hydra-node/src/Hydra/Ledger/Simple.hs +++ b/hydra-node/src/Hydra/Ledger/Simple.hs @@ -24,6 +24,7 @@ import Hydra.Chain.ChainState (ChainSlot (..), ChainStateType, IsChainState (..) import Hydra.Ledger ( Ledger (..), ValidationError (ValidationError), + defaultCollectTransactions, ) import Hydra.Tx (IsTx (..)) @@ -117,7 +118,7 @@ instance IsChainState SimpleTx where simpleLedger :: Ledger SimpleTx simpleLedger = - Ledger{applyTransactions} + Ledger{applyTransactions, collectTransactions = defaultCollectTransactions applyTransactions} where -- NOTE: _slot is unused as SimpleTx transactions don't have a notion of time. applyTransactions :: Foldable t => p -> Set SimpleTxOut -> t SimpleTx -> Either (SimpleTx, ValidationError) (Set SimpleTxOut) diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index cc4f1ee9a2a..6bc17b7de43 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -43,6 +43,7 @@ import Hydra.HeadLogic ( aggregateState, ) import Hydra.HeadLogic qualified as HeadLogic +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) import Hydra.HeadLogic.Outcome (StateChanged (..)) import Hydra.HeadLogic.State (getHeadParameters) import Hydra.HeadLogic.StateEvent (StateEvent (..)) @@ -226,12 +227,12 @@ hydrate tracer env ledger initialChainState EventStore{eventSource, eventSink} e ) wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ()) -wireChainInput node = enqueue . ChainInput +wireChainInput node = enqueue HighPriority . ChainInput where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ()) -wireClientInput node = enqueue . ClientInput +wireClientInput node = enqueue HighPriority . ClientInput where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node @@ -239,9 +240,11 @@ wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (Authenticated (Messa wireNetworkInput node = NetworkCallback { deliver = \Authenticated{party = sender, payload = msg} -> - enqueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + in enqueue (inputPriority input) input , onConnectivity = - enqueue . NetworkInput 1 . ConnectivityEvent + let input = NetworkInput 1 . ConnectivityEvent + in enqueue HighPriority . input } where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node @@ -321,7 +324,9 @@ stepHydraNode node = do maybeReenqueue q@Queued{queuedId, queuedItem} = case queuedItem of NetworkInput ttl msg - | ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg} + | ttl > 0 -> + let newItem = NetworkInput (ttl - 1) msg + in reenqueue (inputPriority newItem) waitDelay q{queuedItem = newItem} _ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem} Environment{party} = env @@ -391,7 +396,7 @@ processEffects node tracer inputId effects = do OnChainEffect{postChainTx} -> postTx postChainTx `catch` \(postTxError :: PostTxError tx) -> - enqueue . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing} + enqueue HighPriority . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing} traceWith tracer $ EndEffect party inputId effectId HydraNode diff --git a/hydra-node/src/Hydra/Node/InputQueue.hs b/hydra-node/src/Hydra/Node/InputQueue.hs index 20543e59054..52bcb27fac0 100644 --- a/hydra-node/src/Hydra/Node/InputQueue.hs +++ b/hydra-node/src/Hydra/Node/InputQueue.hs @@ -1,4 +1,6 @@ -- | The general input queue from which the Hydra head is fed with inputs. +-- This implementation uses a priority queue system to ensure protocol messages +-- (ReqSn, AckSn) are processed before transaction messages under high load. module Hydra.Node.InputQueue where import Hydra.Prelude @@ -7,22 +9,31 @@ import Control.Concurrent.Class.MonadSTM ( isEmptyTQueue, modifyTVar', readTQueue, + tryReadTQueue, writeTQueue, ) +import Hydra.HeadLogic.Input (MessagePriority (..)) --- | The single, required queue in the system from which a hydra head is "fed". +-- | The input queue system with priority support. High priority messages +-- (protocol messages like ReqSn, AckSn) are processed before low priority +-- messages (transaction requests) to ensure snapshot progress under high load. +-- -- NOTE(SN): this probably should be bounded and include proper logging -- NOTE(SN): handle pattern, but likely not required as there is no need for an -- alternative implementation data InputQueue m e = InputQueue - { enqueue :: e -> m () - , reenqueue :: DiffTime -> Queued e -> m () + { enqueue :: MessagePriority -> e -> m () + , reenqueue :: MessagePriority -> DiffTime -> Queued e -> m () , dequeue :: m (Queued e) , isEmpty :: m Bool } data Queued a = Queued {queuedId :: Word64, queuedItem :: a} +-- | Create an input queue with priority support. The queue maintains two +-- internal queues: one for high priority messages (protocol) and one for +-- low priority messages (transactions). Dequeue always tries high priority +-- first before falling back to low priority. createInputQueue :: ( MonadDelay m , MonadAsync m @@ -31,27 +42,40 @@ createInputQueue :: m (InputQueue m e) createInputQueue = do numThreads <- newLabelledTVarIO "num-threads" (0 :: Integer) - nextId <- newLabelledTVarIO "nex-id" 0 - q <- newLabelledTQueueIO "input-queue" + nextId <- newLabelledTVarIO "next-id" 0 + -- Two separate queues for priority handling + highPriorityQueue <- newLabelledTQueueIO "input-queue-high" + lowPriorityQueue <- newLabelledTQueueIO "input-queue-low" pure InputQueue - { enqueue = \queuedItem -> + { enqueue = \priority queuedItem -> atomically $ do queuedId <- readTVar nextId - writeTQueue q Queued{queuedId, queuedItem} + let queued = Queued{queuedId, queuedItem} + case priority of + HighPriority -> writeTQueue highPriorityQueue queued + LowPriority -> writeTQueue lowPriorityQueue queued modifyTVar' nextId succ - , reenqueue = \delay e -> do + , reenqueue = \priority delay e -> do atomically $ modifyTVar' numThreads succ void . asyncLabelled "input-queue-reenqueue" $ do threadDelay delay atomically $ do modifyTVar' numThreads pred - writeTQueue q e + case priority of + HighPriority -> writeTQueue highPriorityQueue e + LowPriority -> writeTQueue lowPriorityQueue e , dequeue = - atomically $ readTQueue q + -- Always try high priority first, then fall back to low priority + atomically $ do + mHigh <- tryReadTQueue highPriorityQueue + case mHigh of + Just item -> pure item + Nothing -> readTQueue lowPriorityQueue , isEmpty = do atomically $ do n <- readTVar numThreads - isEmpty' <- isEmptyTQueue q - pure (isEmpty' && n == 0) + isHighEmpty <- isEmptyTQueue highPriorityQueue + isLowEmpty <- isEmptyTQueue lowPriorityQueue + pure (isHighEmpty && isLowEmpty && n == 0) } diff --git a/hydra-node/test/Hydra/BehaviorSpec.hs b/hydra-node/test/Hydra/BehaviorSpec.hs index d4bc8c7de7f..1f0e874b060 100644 --- a/hydra-node/test/Hydra/BehaviorSpec.hs +++ b/hydra-node/test/Hydra/BehaviorSpec.hs @@ -34,6 +34,7 @@ import Hydra.Chain.Direct.Handlers (LocalChainState, getLatest, newLocalChainSta import Hydra.Events (EventSink (..)) import Hydra.Events.Rotation (EventStore (..)) import Hydra.HeadLogic (CoordinatedHeadState (..), Effect (..), HeadState (..), InitialState (..), Input (..), OpenState (..)) +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) import Hydra.HeadLogicSpec (testSnapshot) import Hydra.Ledger (Ledger, nextChainSlot) import Hydra.Ledger.Simple (SimpleChainState (..), SimpleTx (..), aValidTx, simpleLedger, utxoRef, utxoRefs) @@ -1174,7 +1175,7 @@ simulatedChainAndNetwork initialChainState = do recordAndYieldEvent nodes history ev handleChainEvent :: HydraNode tx m -> ChainEvent tx -> m () -handleChainEvent HydraNode{inputQueue} = enqueue inputQueue . ChainInput +handleChainEvent HydraNode{inputQueue} = enqueue inputQueue HighPriority . ChainInput createMockNetwork :: MonadSTM m => DraftHydraNode tx m -> TVar m [HydraNode tx m] -> Network m (Message tx) createMockNetwork node nodes = @@ -1185,7 +1186,8 @@ createMockNetwork node nodes = mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = - enqueue inputQueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + in enqueue inputQueue (inputPriority input) input sender = getParty node @@ -1292,10 +1294,10 @@ createTestHydraClient :: TestHydraClient tx m createTestHydraClient outputs messages outputHistory HydraNode{inputQueue, nodeStateHandler} = TestHydraClient - { send = enqueue inputQueue . ClientInput + { send = enqueue inputQueue HighPriority . ClientInput , waitForNext = atomically (readTQueue outputs) , waitForNextMessage = atomically (readTQueue messages) - , injectChainEvent = enqueue inputQueue . ChainInput + , injectChainEvent = enqueue inputQueue HighPriority . ChainInput , serverOutputs = reverse <$> readTVarIO outputHistory , queryState = atomically (queryNodeState nodeStateHandler) } diff --git a/hydra-node/test/Hydra/Events/RotationSpec.hs b/hydra-node/test/Hydra/Events/RotationSpec.hs index 3b30affac05..fb25e87d0b7 100644 --- a/hydra-node/test/Hydra/Events/RotationSpec.hs +++ b/hydra-node/test/Hydra/Events/RotationSpec.hs @@ -42,9 +42,11 @@ spec = parallel $ do it "rotates while running" $ \testHydrate -> do failAfter 1 $ do eventStore <- createMockEventStore - -- NOTE: because there will be 5 inputs processed in total, - -- this is hardcoded to ensure we get a checkpoint + a single event at the end - let rotationConfig = RotateAfter (Positive 3) + -- NOTE: because there will be 6 inputs processed in total (5 inputs + 1 tick), + -- this is hardcoded to ensure we get a checkpoint + a single event at the end. + -- With RotateAfter 4: after 5 events (5 > 4 = TRUE) rotation happens, + -- then 6th event is stored separately, giving us 2 events total. + let rotationConfig = RotateAfter (Positive 4) let s0 = initNodeState SimpleChainState{slot = ChainSlot 0} rotatingEventStore <- newRotatedEventStore rotationConfig s0 mkAggregator mkCheckpoint eventStore testHydrate rotatingEventStore [] @@ -56,7 +58,7 @@ spec = parallel $ do it "consistent state after restarting with rotation" $ \testHydrate -> do failAfter 1 $ do eventStore <- createMockEventStore - -- NOTE: because there will be 6 inputs processed in total, + -- NOTE: because there will be 8 inputs processed in total (5 inputs + 1 input + 2 ticks), -- this is hardcoded to ensure we get a single checkpoint event at the end let rotationConfig = RotateAfter (Positive 1) let s0 = initNodeState SimpleChainState{slot = ChainSlot 0} @@ -84,7 +86,7 @@ spec = parallel $ do let inputs = inputsToOpenHead ++ [closeInput] failAfter 1 $ do eventStore <- createMockEventStore - -- NOTE: because there will be 6 inputs processed in total, + -- NOTE: because there will be 7 inputs processed in total (6 inputs + 1 tick), -- this is hardcoded to ensure we get a single checkpoint event at the end let rotationConfig = RotateAfter (Positive 1) -- run rotated event store with prepared inputs @@ -115,7 +117,8 @@ spec = parallel $ do let inputs2 = drop 3 inputs failAfter 1 $ do let s0 = initNodeState SimpleChainState{slot = ChainSlot 0} - -- NOTE: because there will be 6 inputs processed in total, + -- NOTE: because there will be 8 inputs processed in total for restarted node + -- (3 inputs + 3 inputs + 2 ticks) vs 7 for non-restarted (6 inputs + 1 tick), -- this is hardcoded to ensure we get a single checkpoint event at the end let rotationConfig = RotateAfter (Positive 1) -- run restarted node with prepared inputs @@ -142,7 +145,8 @@ spec = parallel $ do [StateEvent{eventId = eventId', stateChanged = checkpoint'}] <- getEvents (eventSource rotatingEventStore') checkpoint `shouldBe` checkpoint' -- stored events should yield consistent event ids - eventId `shouldBe` eventId' + -- note the restarted node has more Tick events (one extra per primeWith call) + eventId `shouldBe` eventId' + 1 describe "Rotation algorithm" $ do prop "rotates on startup" $ diff --git a/hydra-node/test/Hydra/HeadLogicSnapshotSpec.hs b/hydra-node/test/Hydra/HeadLogicSnapshotSpec.hs index 5c9fedab19a..26509fbce1f 100644 --- a/hydra-node/test/Hydra/HeadLogicSnapshotSpec.hs +++ b/hydra-node/test/Hydra/HeadLogicSnapshotSpec.hs @@ -8,6 +8,8 @@ import Test.Hydra.Prelude import Data.List qualified as List import Data.Map.Strict qualified as Map +import Data.Sequence (Seq (Empty)) +import Data.Sequence qualified as Seq import Hydra.HeadLogic (CoordinatedHeadState (..), Effect (..), HeadState (..), OpenState (OpenState), Outcome, SeenSnapshot (..), coordinatedHeadState, isLeader, update) import Hydra.HeadLogicSpec (StepState, getState, hasEffect, hasEffectSatisfying, hasNoEffectSatisfying, inOpenState, inOpenState', receiveMessage, receiveMessageFrom, runHeadLogic, step) import Hydra.Ledger.Simple (SimpleTx (..), aValidTx, simpleLedger, utxoRef) @@ -55,7 +57,7 @@ spec = do CoordinatedHeadState { localUTxO = u0 , allTxs = mempty - , localTxs = mempty + , localTxs = Empty , confirmedSnapshot = InitialSnapshot testHeadId u0 , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -85,7 +87,7 @@ spec = do it "does NOT send ReqSn when we are NOT the leader even if no snapshot in flight" $ do let tx = aValidTx 1 - st = coordinatedHeadState{localTxs = [tx]} + st = coordinatedHeadState{localTxs = Seq.singleton tx} outcome = update (envFor bobSk) simpleLedger (inOpenState' [alice, bob] st) $ receiveMessageFrom bob $ ReqTx tx outcome `hasNoEffectSatisfying` sendReqSn @@ -104,7 +106,7 @@ spec = do st' = inOpenState' threeParties $ coordinatedHeadState - { localTxs = [tx] + { localTxs = Seq.singleton tx , allTxs = Map.singleton (txId tx) tx , localUTxO = u0 <> utxoRef (txId tx) , seenSnapshot = RequestedSnapshot{lastSeen = 0, requested = 1} @@ -201,7 +203,7 @@ prop_singleMemberHeadAlwaysSnapshotOnReqTx sn = monadicST $ do CoordinatedHeadState { localUTxO = mempty , allTxs = mempty - , localTxs = [] + , localTxs = Empty , confirmedSnapshot = sn , seenSnapshot , currentDepositTxId = Nothing diff --git a/hydra-node/test/Hydra/HeadLogicSpec.hs b/hydra-node/test/Hydra/HeadLogicSpec.hs index 6f698e49d2d..b7339d59796 100644 --- a/hydra-node/test/Hydra/HeadLogicSpec.hs +++ b/hydra-node/test/Hydra/HeadLogicSpec.hs @@ -17,6 +17,8 @@ import Control.Lens ((.~)) import Data.List qualified as List import Data.Map (notMember) import Data.Map qualified as Map +import Data.Sequence (Seq (Empty)) +import Data.Sequence qualified as Seq import Data.Set qualified as Set import Hydra.API.ClientInput (ClientInput (SideLoadSnapshot)) import Hydra.API.ServerOutput (DecommitInvalidReason (..)) @@ -90,7 +92,7 @@ spec = CoordinatedHeadState { localUTxO = mempty , allTxs = mempty - , localTxs = mempty + , localTxs = Empty , confirmedSnapshot = InitialSnapshot testHeadId mempty , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -155,7 +157,7 @@ spec = case headState s of Open OpenState{coordinatedHeadState = CoordinatedHeadState{localTxs}} -> do - localTxs `shouldBe` [tx2, tx3] + localTxs `shouldBe` Seq.fromList [tx2, tx3] _ -> fail "expected Open state" describe "Deposit" $ do @@ -836,7 +838,7 @@ spec = coordinatedHeadState { localUTxO = utxoRef 4 , allTxs = Map.fromList [(txId tx2, tx2), (txId tx3, tx3)] - , localTxs = [tx2, tx3] + , localTxs = Seq.fromList [tx2, tx3] , confirmedSnapshot = ConfirmedSnapshot snapshot1 multisig1 , seenSnapshot = RequestedSnapshot{lastSeen = 1, requested = 2} } @@ -956,7 +958,7 @@ spec = CoordinatedHeadState { localUTxO = mempty , allTxs = mempty - , localTxs = [] + , localTxs = Empty , confirmedSnapshot = InitialSnapshot testHeadId mempty , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -1052,7 +1054,7 @@ spec = CoordinatedHeadState { localUTxO = uncurry UTxO.singleton utxo , allTxs = mempty - , localTxs = [expiringTransaction] + , localTxs = Seq.singleton expiringTransaction , confirmedSnapshot = InitialSnapshot testHeadId $ uncurry UTxO.singleton utxo , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -1078,7 +1080,7 @@ spec = OpenState { coordinatedHeadState = CoordinatedHeadState{localTxs} - } -> null localTxs + } -> Seq.null localTxs _ -> False prop "empty inputs in decommit tx are prevented" $ \tx -> do @@ -1093,7 +1095,7 @@ spec = CoordinatedHeadState { localUTxO = mempty , allTxs = mempty - , localTxs = [] + , localTxs = Empty , confirmedSnapshot = InitialSnapshot testHeadId mempty , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing @@ -1263,7 +1265,7 @@ inOpenState parties = CoordinatedHeadState { localUTxO = u0 , allTxs = mempty - , localTxs = mempty + , localTxs = Empty , confirmedSnapshot , seenSnapshot = NoSeenSnapshot , currentDepositTxId = Nothing diff --git a/hydra-node/test/Hydra/Model/MockChain.hs b/hydra-node/test/Hydra/Model/MockChain.hs index 9c25bd31a5b..829d89dc5e6 100644 --- a/hydra-node/test/Hydra/Model/MockChain.hs +++ b/hydra-node/test/Hydra/Model/MockChain.hs @@ -58,7 +58,8 @@ import Hydra.HeadLogic ( Input (..), OpenState (..), ) -import Hydra.Ledger (Ledger (..), ValidationError (..), collectTransactions) +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) +import Hydra.Ledger (Ledger (..), ValidationError (..), defaultCollectTransactions) import Hydra.Ledger.Cardano (adjustUTxO, fromChainSlot) import Hydra.Ledger.Cardano.Evaluate (eraHistoryWithoutHorizon, evaluateTx, renderEvaluationReport) import Hydra.Logging (Tracer) @@ -189,7 +190,7 @@ mockChainAndNetwork tr seedKeys commits = do , chainHandler = chainSyncHandler tr - (enqueue . ChainInput) + (enqueue HighPriority . ChainInput) getTimeHandle ctx localChainState @@ -320,7 +321,9 @@ mockChainAndNetwork tr seedKeys commits = do -- UTxO) are silently dropped which emulates the chain behaviour that -- only the client is potentially witnessing the failure, and no -- invalid transaction will ever be included in the chain. - (txs', utxo') = collectTransactions ledger newSlot utxo transactions + -- Convert to/from Seq for collectTransactions interface + (txsSeq, utxo') = collectTransactions ledger newSlot utxo (Seq.fromList transactions) + txs' = toList txsSeq in (newSlot, position, blocks :|> (header, txs', utxo'), utxo') -- | Construct fixed 'TimeHandle' that starts from 0 and has the era horizon far in the future. @@ -339,7 +342,7 @@ fixedTimeHandleIndefiniteHorizon = do scriptLedger :: Ledger Tx scriptLedger = - Ledger{applyTransactions} + Ledger{applyTransactions, collectTransactions = defaultCollectTransactions applyTransactions} where -- XXX: We could easily add 'slot' validation here and this would already -- emulate the dropping of outdated transactions from the cardano-node @@ -375,7 +378,8 @@ createMockNetwork draftNode nodes = mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = do - enqueue inputQueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + enqueue inputQueue (inputPriority input) input sender = getParty draftNode diff --git a/hydra-node/test/Hydra/Node/InputQueueSpec.hs b/hydra-node/test/Hydra/Node/InputQueueSpec.hs index 74ec0b096dd..47c00ef0eef 100644 --- a/hydra-node/test/Hydra/Node/InputQueueSpec.hs +++ b/hydra-node/test/Hydra/Node/InputQueueSpec.hs @@ -3,6 +3,7 @@ module Hydra.Node.InputQueueSpec where import Hydra.Prelude import Control.Monad.IOSim (IOSim, runSimOrThrow) +import Hydra.HeadLogic.Input (MessagePriority (..)) import Hydra.Node.InputQueue (Queued (queuedId), createInputQueue, dequeue, enqueue) import Test.Hspec (Spec) import Test.Hspec.QuickCheck (prop) @@ -22,7 +23,7 @@ prop_identify_enqueued_items (NonEmpty inputs) = test = do q <- createInputQueue forM inputs $ \i -> do - enqueue q i + enqueue q HighPriority i queuedId <$> dequeue q ids = runSimOrThrow test in isContinuous ids diff --git a/hydra-node/test/Hydra/NodeSpec.hs b/hydra-node/test/Hydra/NodeSpec.hs index 38c1abf482f..e6b76d4b17c 100644 --- a/hydra-node/test/Hydra/NodeSpec.hs +++ b/hydra-node/test/Hydra/NodeSpec.hs @@ -16,6 +16,7 @@ import Hydra.Chain.ChainState (ChainSlot (ChainSlot), IsChainState) import Hydra.Events (EventSink (..), EventSource (..), getEventId) import Hydra.Events.Rotation (EventStore (..), LogId) import Hydra.HeadLogic (Input (..), TTL) +import Hydra.HeadLogic.Input (inputPriority) import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged) import Hydra.HeadLogic.StateEvent (StateEvent (..), genStateEvent) import Hydra.HeadLogicSpec (inInitialState, receiveMessage, receiveMessageFrom, testSnapshot) @@ -28,6 +29,7 @@ import Hydra.Node ( DraftHydraNode, HydraNode (..), HydraNodeLog (..), + NodeStateHandler (..), checkHeadState, connect, hydrate, @@ -333,11 +335,15 @@ spec = parallel $ do entries <- fmap Logging.message <$> readTVarIO logs entries `shouldSatisfy` any isContestationPeriodMismatch --- | Add given list of inputs to the 'InputQueue'. This is returning the node to --- allow for chaining with 'runToCompletion'. -primeWith :: Monad m => [Input tx] -> HydraNode tx m -> m (HydraNode tx m) -primeWith inputs node@HydraNode{inputQueue = InputQueue{enqueue}} = do - forM_ inputs enqueue +-- | Add given list of inputs to the 'InputQueue'. A preceding 'Tick' is enqueued +-- to advance the chain slot and ensure the 'NodeState' is in sync. This is +-- returning the node to allow for chaining with 'runToCompletion'. +primeWith :: (MonadSTM m, MonadTime m) => [Input tx] -> HydraNode tx m -> m (HydraNode tx m) +primeWith inputs node@HydraNode{inputQueue = InputQueue{enqueue}, nodeStateHandler = NodeStateHandler{queryNodeState}} = do + now <- getCurrentTime + chainSlot <- currentSlot <$> atomically queryNodeState + let tick = ChainInput $ Tick now (chainSlot + 1) + forM_ (tick : inputs) $ \input -> enqueue (inputPriority input) input pure node -- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations. @@ -445,7 +451,7 @@ runToCompletion node@HydraNode{inputQueue = InputQueue{isEmpty}} = go -- | Creates a full 'HydraNode' with given parameters and primed 'Input's. Note -- that this node is 'notConnect'ed to any components. testHydraNode :: - (MonadDelay m, MonadAsync m, MonadLabelledSTM m, MonadThrow m, MonadUnliftIO m) => + (MonadDelay m, MonadAsync m, MonadLabelledSTM m, MonadThrow m, MonadUnliftIO m, MonadTime m) => Tracer m (HydraNodeLog SimpleTx) -> SigningKey HydraKey -> [Party] -> diff --git a/hydra-tx/src/Hydra/Tx/Crypto.hs b/hydra-tx/src/Hydra/Tx/Crypto.hs index 62d8d81a63f..a4fc4ad6227 100644 --- a/hydra-tx/src/Hydra/Tx/Crypto.hs +++ b/hydra-tx/src/Hydra/Tx/Crypto.hs @@ -50,7 +50,7 @@ import Cardano.Crypto.Hash (Blake2b_256, SHA256, castHash, hashFromBytes, hashTo import Cardano.Crypto.Hash qualified as Crypto import Cardano.Crypto.Hash.Class (HashAlgorithm (digest)) import Cardano.Crypto.Seed (getSeedBytes, mkSeedFromBytes) -import Cardano.Crypto.Util (SignableRepresentation) +import Cardano.Crypto.Util (SignableRepresentation (..)) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.ByteString.Base16 qualified as Base16 @@ -337,15 +337,46 @@ verifyMultiSignature :: MultiSignature a -> a -> Verified -verifyMultiSignature vks HydraMultiSignature{multiSignature} a +verifyMultiSignature vks multisig a = + -- OPTIMIZATION: Pre-compute the signable representation once instead of + -- once per party. With N parties, this reduces hashUTxO calls from 3N to 3. + verifyMultiSignatureWithBytes vks multisig (getSignableRepresentation a) + +-- | Verify a multi-signature using pre-computed signable bytes. +-- +-- This is an optimization for cases where the signable representation is +-- expensive to compute (e.g., snapshots with large UTxO sets). The caller +-- computes the bytes once and reuses them for all verifications. +verifyMultiSignatureWithBytes :: + [VerificationKey HydraKey] -> + MultiSignature a -> + ByteString -> + Verified +verifyMultiSignatureWithBytes vks HydraMultiSignature{multiSignature} signableBytes | length vks == length multiSignature = - let verifications = zipWith (\vk s -> (vk, verify vk s a)) vks multiSignature + let verifications = zipWith (\vk s -> (vk, verifyWithBytes vk s signableBytes)) vks multiSignature failures = fst <$> filter (not . snd) verifications in if null failures then Verified else FailedKeys failures | otherwise = KeyNumberMismatch +-- | Verify a signature using pre-computed signable bytes. +-- +-- This is an optimization for cases where the signable representation is +-- expensive to compute. The caller computes the bytes once and reuses them. +verifyWithBytes :: + VerificationKey HydraKey -> + Signature a -> + ByteString -> + Bool +verifyWithBytes (HydraVerificationKey vk) (HydraSignature sig) signableBytes = + case verifyDSIGN ctx vk signableBytes sig of + Right () -> True + Left _ -> False + where + ctx = () :: ContextDSIGN Ed25519DSIGN + toPlutusSignatures :: MultiSignature a -> [OnChain.Signature] toPlutusSignatures (HydraMultiSignature sigs) = toPlutusSignature <$> sigs