Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/actions/nix-cachix-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ runs:
using: composite

steps:
- name: 🧹 Free disk space
if: runner.os == 'Linux'
shell: bash
run: |
echo "Disk space before cleanup:"
df -h /
# Remove unnecessary tools to free up disk space
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /opt/hostedtoolcache/CodeQL
sudo docker image prune --all --force || true
echo "Disk space after cleanup:"
df -h /

- name: ❄ Prepare nix
uses: cachix/install-nix-action@v30
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/ci-nix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ on:
branches:
- master
- release
- v1.2.0-base
pull_request:
branches:
- master
- v1.2.0-base
schedule:
# Everyday at 4:00 AM
- cron: "0 4 * * *"
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ concurrency:

on:
push:
branches: [ "master" ]
branches: [ "master", "v1.2.0-base" ]
tags: [ "*.*.*" ]
pull_request:
branches: [ "master", "v1.2.0-base" ]
workflow_dispatch:
inputs:
ref_name:
Expand Down Expand Up @@ -73,6 +75,12 @@ jobs:
# And the version as the git commit.
VERSION=${{github.sha}}

# For PRs, tag as pr-<number>
if [[ "${{github.event_name}}" == "pull_request" ]]; then
IMAGE_LABEL=pr-${{github.event.pull_request.number}}
VERSION=${{github.event.pull_request.head.sha}}
fi

# Determine whether we are building a tag and if yes, set the label
# name to be the tag name, and the version to be the tag.
BUILDING_TAG=${{github.ref_type == 'tag'}}
Expand Down
68 changes: 45 additions & 23 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import Hydra.Prelude

import Data.List (elemIndex, minimumBy)
import Data.Map.Strict qualified as Map
import Data.Sequence (Seq (Empty), (|>))
import Data.Sequence qualified as Seq
import Data.Set ((\\))
import Data.Set qualified as Set
import Hydra.API.ClientInput (ClientInput (..))
Expand Down Expand Up @@ -73,7 +75,6 @@ import Hydra.HeadLogic.State (
)
import Hydra.Ledger (
Ledger (..),
applyTransactions,
)
import Hydra.Network qualified as Network
import Hydra.Network.Message (Message (..), NetworkEvent (..))
Expand Down Expand Up @@ -342,7 +343,7 @@ onOpenNetworkReqTx env ledger currentSlot st ttl tx =
-- spec. Do we really need to store that we have
-- requested a snapshot? If yes, should update spec.
<> newState SnapshotRequestDecided{snapshotNumber = nextSn}
<> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs') decommitTx currentDepositTxId)
<> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs') decommitTx currentDepositTxId)
else outcome

Environment{party} = env
Expand Down Expand Up @@ -371,7 +372,7 @@ onOpenNetworkReqTx env ledger currentSlot st ttl tx =

-- NOTE: Order of transactions is important here. See also
-- 'pruneTransactions'.
localTxs' = localTxs <> [tx]
localTxs' = localTxs |> tx

-- | Process a snapshot request ('ReqSn') from party.
--
Expand Down Expand Up @@ -543,21 +544,18 @@ onOpenNetworkReqSn env ledger pendingDeposits currentSlot st otherParty sv sn re
Error $ RequireFailed $ SnapshotDoesNotApply sn (txId tx) err
Right u -> cont u

pruneTransactions utxo = do
-- NOTE: Using foldl' is important to apply transacations in the correct
pruneTransactions utxo =
-- NOTE: Using collectTransactions applies transactions in the correct
-- order. That is, left-associative as new transactions are first validated
-- and then appended to `localTxs` (when aggregating
-- 'TransactionAppliedToLocalUTxO').
foldl' go ([], utxo) localTxs
where
go (txs, u) tx =
-- XXX: We prune transactions on any error, while only some of them are
-- actually expected.
-- For example: `OutsideValidityIntervalUTxO` ledger errors are expected
-- here when a tx becomes invalid.
case applyTransactions ledger currentSlot u [tx] of
Left (_, _) -> (txs, u)
Right u' -> (txs <> [tx], u')
-- XXX: We prune transactions on any error, while only some of them are
-- actually expected.
-- For example: `OutsideValidityIntervalUTxO` ledger errors are expected
-- here when a tx becomes invalid.
-- OPTIMIZATION: Uses Ledger's collectTransactions which works directly with
-- Seq to avoid list conversions. Also uses O(1) Seq append internally.
collectTransactions ledger currentSlot utxo localTxs

confSn = case confirmedSnapshot of
InitialSnapshot{} -> 0
Expand Down Expand Up @@ -690,13 +688,30 @@ onOpenNetworkAckSn Environment{party} pendingDeposits openState otherParty snaps

maybeRequestNextSnapshot previous outcome = do
let nextSn = previous.number + 1
if isLeader parameters party nextSn && not (null localTxs)
-- Check for active deposits that can be picked up (only if no deposit already in progress)
nextActiveDeposit =
if isNothing currentDepositTxId && isNothing decommitTx
then getNextActiveDeposit pendingDeposits
else Nothing
-- Use current deposit if in progress, otherwise use next active deposit
depositToInclude = currentDepositTxId <|> nextActiveDeposit
-- Request snapshot if we have pending txs OR a deposit to process
shouldRequest = isLeader parameters party nextSn && (not (Seq.null localTxs) || isJust depositToInclude)
if shouldRequest
then
outcome
<> newState SnapshotRequestDecided{snapshotNumber = nextSn}
<> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) decommitTx currentDepositTxId)
<> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs) decommitTx depositToInclude)
else outcome

-- \| Get the next active deposit to include in a snapshot request (FIFO by creation time)
getNextActiveDeposit :: (Eq (UTxOType tx), Monoid (UTxOType tx)) => Map (TxIdType tx) (Deposit tx) -> Maybe (TxIdType tx)
getNextActiveDeposit deposits =
let isActive (_, Deposit{deposited, status}) = deposited /= mempty && status == Active
in case filter isActive (Map.toList deposits) of
[] -> Nothing
xs -> Just $ fst $ minimumBy (comparing ((\Deposit{created} -> created) . snd)) xs

maybePostIncrementTx snapshot@Snapshot{utxoToCommit} signatures outcome =
-- TODO: check status (again)?
case find (\(_, Deposit{deposited}) -> Just deposited == utxoToCommit) $ Map.toList pendingDeposits of
Expand Down Expand Up @@ -890,7 +905,7 @@ onOpenNetworkReqDec env ledger ttl currentSlot openState decommitTx =

maybeRequestSnapshot =
if not snapshotInFlight && isLeader parameters party nextSn
then cause (NetworkEffect (ReqSn version nextSn (txId <$> localTxs) (Just decommitTx) Nothing))
then cause (NetworkEffect (ReqSn version nextSn (toList $ txId <$> localTxs) (Just decommitTx) Nothing))
else noop

Environment{party} = env
Expand Down Expand Up @@ -992,7 +1007,7 @@ onOpenChainTick env chainTime pendingDeposits st =
-- requested a snapshot? If yes, should update spec.
newState SnapshotRequestDecided{snapshotNumber = nextSn}
-- Spec: multicast (reqSn,̂ 𝑣,̄ 𝒮.𝑠 + 1,̂ 𝒯, 𝑈𝛼, ⊥)
<> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) Nothing (Just depositTxId))
<> cause (NetworkEffect $ ReqSn version nextSn (toList $ txId <$> localTxs) Nothing (Just depositTxId))
else
noop
where
Expand Down Expand Up @@ -1498,6 +1513,13 @@ aggregateNodeState nodeState sc =
ns{currentSlot = chainSlot}
ChainRolledBack{chainState} ->
ns{currentSlot = chainStateSlot chainState}
-- Restore full NodeState from checkpoint, including pendingDeposits
Checkpoint NodeState{headState = checkpointHeadState, pendingDeposits = checkpointDeposits, currentSlot = checkpointSlot} ->
NodeState
{ headState = checkpointHeadState
, pendingDeposits = checkpointDeposits
, currentSlot = checkpointSlot
}
_ -> ns

-- * HeadState aggregate
Expand Down Expand Up @@ -1552,7 +1574,7 @@ aggregate st = \case
CoordinatedHeadState
{ localUTxO = initialUTxO
, allTxs = mempty
, localTxs = mempty
, localTxs = Empty
, confirmedSnapshot = InitialSnapshot{headId, initialUTxO}
, seenSnapshot = NoSeenSnapshot
, currentDepositTxId = Nothing
Expand Down Expand Up @@ -1587,7 +1609,7 @@ aggregate st = \case
{ localUTxO = newLocalUTxO
, -- NOTE: Order of transactions is important here. See also
-- 'pruneTransactions'.
localTxs = localTxs <> [tx]
localTxs = localTxs |> tx
}
}
where
Expand Down Expand Up @@ -1675,14 +1697,14 @@ aggregate st = \case
InitialSnapshot{initialUTxO} ->
coordinatedHeadState
{ localUTxO = initialUTxO
, localTxs = mempty
, localTxs = Empty
, allTxs = mempty
, seenSnapshot = NoSeenSnapshot
}
ConfirmedSnapshot{snapshot = Snapshot{utxo}} ->
coordinatedHeadState
{ localUTxO = utxo
, localTxs = mempty
, localTxs = Empty
, allTxs = mempty
, seenSnapshot = LastSeenSnapshot snapshotNumber
}
Expand Down
33 changes: 31 additions & 2 deletions hydra-node/src/Hydra/HeadLogic/Input.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
{-# LANGUAGE UndecidableInstances #-}

module Hydra.HeadLogic.Input where
module Hydra.HeadLogic.Input (
Input (..),
TTL,
MessagePriority (..),
inputPriority,
) where

import Hydra.Prelude

import Hydra.API.ClientInput (ClientInput)
import Hydra.Chain (ChainEvent)
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Network.Message (Message, NetworkEvent)
import Hydra.Network.Message (Message (..), NetworkEvent (..))
import Hydra.Tx.IsTx (ArbitraryIsTx)

type TTL = Natural

-- | Priority level for input messages. Protocol messages (ReqSn, AckSn) get
-- high priority to prevent them from being delayed by transaction messages
-- under high load.
data MessagePriority = HighPriority | LowPriority
deriving stock (Eq, Show, Generic)

-- | Classify an input by its priority. Protocol messages that are critical
-- for snapshot progress get high priority, while transaction submissions
-- get low priority.
inputPriority :: Input tx -> MessagePriority
inputPriority = \case
-- Protocol messages: high priority to ensure snapshot progress
NetworkInput{networkEvent = ReceivedMessage{msg = ReqSn{}}} -> HighPriority
NetworkInput{networkEvent = ReceivedMessage{msg = AckSn{}}} -> HighPriority
-- Connectivity events: high priority for protocol health
NetworkInput{networkEvent = ConnectivityEvent{}} -> HighPriority
-- Transaction requests: low priority (can be delayed under load)
NetworkInput{networkEvent = ReceivedMessage{msg = ReqTx{}}} -> LowPriority
NetworkInput{networkEvent = ReceivedMessage{msg = ReqDec{}}} -> LowPriority
-- Client inputs: high priority (user-initiated actions)
ClientInput{} -> HighPriority
-- Chain events: high priority (must be processed promptly)
ChainInput{} -> HighPriority

-- | Inputs that are processed by the head logic (the "core"). Corresponding to
-- each of the "shell" layers, we distinguish between inputs from the client,
-- the network and the chain.
Expand Down
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ data StateChanged tx
{ snapshot :: Snapshot tx
, requestedTxIds :: [TxIdType tx]
, newLocalUTxO :: UTxOType tx
, newLocalTxs :: [tx]
, newLocalTxs :: Seq tx
, newCurrentDepositTxId :: Maybe (TxIdType tx)
}
| PartySignedSnapshot {snapshot :: Snapshot tx, party :: Party, signature :: Signature (Snapshot tx)}
Expand Down
7 changes: 4 additions & 3 deletions hydra-node/src/Hydra/HeadLogic/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ data CoordinatedHeadState tx = CoordinatedHeadState
{ localUTxO :: UTxOType tx
-- ^ The latest UTxO resulting from applying 'localTxs' to
-- 'confirmedSnapshot'. Spec: L̂
, localTxs :: [tx]
-- ^ List of transactions applied locally and pending inclusion in a snapshot.
-- Ordering in this list is important as transactions are added in order of
, localTxs :: Seq tx
-- ^ Sequence of transactions applied locally and pending inclusion in a snapshot.
-- Uses Seq for O(1) append (snoc) instead of O(n) list append.
-- Ordering in this sequence is important as transactions are added in order of
-- application. Spec: T̂
, allTxs :: !(Map.Map (TxIdType tx) tx)
-- ^ Map containing all the transactions ever seen by this node and not yet
Expand Down
34 changes: 25 additions & 9 deletions hydra-node/src/Hydra/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Hydra.Ledger where

import Hydra.Prelude

import Data.Sequence (Seq (Empty), (|>))
import Hydra.Chain.ChainState (ChainSlot (..))
import Hydra.Tx.IsTx (IsTx (..))
import Test.QuickCheck.Instances.Natural ()
Expand All @@ -18,7 +19,7 @@ nextChainSlot (ChainSlot n) = ChainSlot (n + 1)
-- | An abstract interface for a 'Ledger'. Allows to define mock / simpler
-- implementation for testing as well as limiting feature-envy from the business
-- logic by forcing a closed interface.
newtype Ledger tx = Ledger
data Ledger tx = Ledger
{ applyTransactions ::
ChainSlot ->
UTxOType tx ->
Expand All @@ -28,18 +29,33 @@ newtype Ledger tx = Ledger
-- validation failures returned from the ledger.
-- TODO: 'ValidationError' should also include the UTxO, which is not
-- necessarily the same as the given UTxO after some transactions
, collectTransactions ::
ChainSlot ->
UTxOType tx ->
Seq tx ->
(Seq tx, UTxOType tx)
-- ^ Collect applicable transactions and resulting UTxO. Unlike 'applyTransactions',
-- this function continues on validation errors, returning only the valid transactions.
-- OPTIMIZATION: Uses Seq for O(1) append. Implementations can optimize this to avoid
-- repeated UTxO format conversions when validating transactions one by one.
}

-- | Collect applicable transactions and resulting UTxO. In contrast to
-- 'applyTransactions', this functions continues on validation errors.
collectTransactions :: Ledger tx -> ChainSlot -> UTxOType tx -> [tx] -> ([tx], UTxOType tx)
collectTransactions Ledger{applyTransactions} slot utxo =
foldr go ([], utxo)
-- | Default implementation of 'collectTransactions' using 'applyTransactions'.
-- This is less efficient than specialized implementations as it may perform
-- repeated format conversions.
defaultCollectTransactions ::
(ChainSlot -> UTxOType tx -> [tx] -> Either (tx, ValidationError) (UTxOType tx)) ->
ChainSlot ->
UTxOType tx ->
Seq tx ->
(Seq tx, UTxOType tx)
defaultCollectTransactions applyTxs slot utxo =
foldl' go (Empty, utxo)
where
go tx (applicableTxs, u) =
case applyTransactions slot u [tx] of
go (applicableTxs, u) tx =
case applyTxs slot u [tx] of
Left _ -> (applicableTxs, u)
Right u' -> (applicableTxs <> [tx], u')
Right u' -> (applicableTxs |> tx, u')

-- | Either valid or an error which we get from the ledger-specs tx validation.
data ValidationResult
Expand Down
Loading
Loading