Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/actions/nix-cachix-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ runs:
using: composite

steps:
- name: 🧹 Free disk space
if: runner.os == 'Linux'
shell: bash
run: |
echo "Disk space before cleanup:"
df -h /
# Remove unnecessary tools to free up disk space
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /opt/hostedtoolcache/CodeQL
sudo docker image prune --all --force || true
echo "Disk space after cleanup:"
df -h /

- name: ❄ Prepare nix
uses: cachix/install-nix-action@v30
with:
Expand Down
33 changes: 31 additions & 2 deletions hydra-node/src/Hydra/HeadLogic/Input.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
{-# LANGUAGE UndecidableInstances #-}

module Hydra.HeadLogic.Input where
module Hydra.HeadLogic.Input (
Input (..),
TTL,
MessagePriority (..),
inputPriority,
) where

import Hydra.Prelude

import Hydra.API.ClientInput (ClientInput)
import Hydra.Chain (ChainEvent)
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Network.Message (Message, NetworkEvent)
import Hydra.Network.Message (Message (..), NetworkEvent (..))
import Hydra.Tx.IsTx (ArbitraryIsTx)

type TTL = Natural

-- | Priority level for input messages. Protocol messages (ReqSn, AckSn) get
-- high priority to prevent them from being delayed by transaction messages
-- under high load.
data MessagePriority = HighPriority | LowPriority
deriving stock (Eq, Show, Generic)

-- | Classify an input by its priority. Protocol messages that are critical
-- for snapshot progress get high priority, while transaction submissions
-- get low priority.
inputPriority :: Input tx -> MessagePriority
inputPriority = \case
-- Protocol messages: high priority to ensure snapshot progress
NetworkInput{networkEvent = ReceivedMessage{msg = ReqSn{}}} -> HighPriority
NetworkInput{networkEvent = ReceivedMessage{msg = AckSn{}}} -> HighPriority
-- Connectivity events: high priority for protocol health
NetworkInput{networkEvent = ConnectivityEvent{}} -> HighPriority
-- Transaction requests: low priority (can be delayed under load)
NetworkInput{networkEvent = ReceivedMessage{msg = ReqTx{}}} -> LowPriority
NetworkInput{networkEvent = ReceivedMessage{msg = ReqDec{}}} -> LowPriority
-- Client inputs: high priority (user-initiated actions)
ClientInput{} -> HighPriority
-- Chain events: high priority (must be processed promptly)
ChainInput{} -> HighPriority

-- | Inputs that are processed by the head logic (the "core"). Corresponding to
-- each of the "shell" layers, we distinguish between inputs from the client,
-- the network and the chain.
Expand Down
17 changes: 11 additions & 6 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import Hydra.HeadLogic (
aggregateState,
)
import Hydra.HeadLogic qualified as HeadLogic
import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority)
import Hydra.HeadLogic.Outcome (StateChanged (..))
import Hydra.HeadLogic.State (getHeadParameters)
import Hydra.HeadLogic.StateEvent (StateEvent (..))
Expand Down Expand Up @@ -226,22 +227,24 @@ hydrate tracer env ledger initialChainState EventStore{eventSource, eventSink} e
)

wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ())
wireChainInput node = enqueue . ChainInput
wireChainInput node = enqueue HighPriority . ChainInput
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node

wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ())
wireClientInput node = enqueue . ClientInput
wireClientInput node = enqueue HighPriority . ClientInput
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node

wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (Authenticated (Message tx)) m
wireNetworkInput node =
NetworkCallback
{ deliver = \Authenticated{party = sender, payload = msg} ->
enqueue $ mkNetworkInput sender msg
let input = mkNetworkInput sender msg
in enqueue (inputPriority input) input
, onConnectivity =
enqueue . NetworkInput 1 . ConnectivityEvent
let input = NetworkInput 1 . ConnectivityEvent
in enqueue HighPriority . input
}
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
Expand Down Expand Up @@ -322,7 +325,9 @@ stepHydraNode node = do
maybeReenqueue q@Queued{queuedId, queuedItem} =
case queuedItem of
NetworkInput ttl msg
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg}
| ttl > 0 ->
let newItem = NetworkInput (ttl - 1) msg
in reenqueue (inputPriority newItem) waitDelay q{queuedItem = newItem}
_ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem}

Environment{party} = env
Expand Down Expand Up @@ -391,7 +396,7 @@ processEffects node tracer inputId effects = do
OnChainEffect{postChainTx} ->
postTx postChainTx
`catch` \(postTxError :: PostTxError tx) ->
enqueue . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing}
enqueue HighPriority . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing}
traceWith tracer $ EndEffect party inputId effectId

HydraNode
Expand Down
48 changes: 36 additions & 12 deletions hydra-node/src/Hydra/Node/InputQueue.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-- | The general input queue from which the Hydra head is fed with inputs.
-- This implementation uses a priority queue system to ensure protocol messages
-- (ReqSn, AckSn) are processed before transaction messages under high load.
module Hydra.Node.InputQueue where

import Hydra.Prelude
Expand All @@ -7,22 +9,31 @@ import Control.Concurrent.Class.MonadSTM (
isEmptyTQueue,
modifyTVar',
readTQueue,
tryReadTQueue,
writeTQueue,
)
import Hydra.HeadLogic.Input (MessagePriority (..))

-- | The single, required queue in the system from which a hydra head is "fed".
-- | The input queue system with priority support. High priority messages
-- (protocol messages like ReqSn, AckSn) are processed before low priority
-- messages (transaction requests) to ensure snapshot progress under high load.
--
-- NOTE(SN): this probably should be bounded and include proper logging
-- NOTE(SN): handle pattern, but likely not required as there is no need for an
-- alternative implementation
data InputQueue m e = InputQueue
{ enqueue :: e -> m ()
, reenqueue :: DiffTime -> Queued e -> m ()
{ enqueue :: MessagePriority -> e -> m ()
, reenqueue :: MessagePriority -> DiffTime -> Queued e -> m ()
, dequeue :: m (Queued e)
, isEmpty :: m Bool
}

data Queued a = Queued {queuedId :: Word64, queuedItem :: a}

-- | Create an input queue with priority support. The queue maintains two
-- internal queues: one for high priority messages (protocol) and one for
-- low priority messages (transactions). Dequeue always tries high priority
-- first before falling back to low priority.
createInputQueue ::
( MonadDelay m
, MonadAsync m
Expand All @@ -31,27 +42,40 @@ createInputQueue ::
m (InputQueue m e)
createInputQueue = do
numThreads <- newLabelledTVarIO "num-threads" (0 :: Integer)
nextId <- newLabelledTVarIO "nex-id" 0
q <- newLabelledTQueueIO "input-queue"
nextId <- newLabelledTVarIO "next-id" 0
-- Two separate queues for priority handling
highPriorityQueue <- newLabelledTQueueIO "input-queue-high"
lowPriorityQueue <- newLabelledTQueueIO "input-queue-low"
pure
InputQueue
{ enqueue = \queuedItem ->
{ enqueue = \priority queuedItem ->
atomically $ do
queuedId <- readTVar nextId
writeTQueue q Queued{queuedId, queuedItem}
let queued = Queued{queuedId, queuedItem}
case priority of
HighPriority -> writeTQueue highPriorityQueue queued
LowPriority -> writeTQueue lowPriorityQueue queued
modifyTVar' nextId succ
, reenqueue = \delay e -> do
, reenqueue = \priority delay e -> do
atomically $ modifyTVar' numThreads succ
void . asyncLabelled "input-queue-reenqueue" $ do
threadDelay delay
atomically $ do
modifyTVar' numThreads pred
writeTQueue q e
case priority of
HighPriority -> writeTQueue highPriorityQueue e
LowPriority -> writeTQueue lowPriorityQueue e
, dequeue =
atomically $ readTQueue q
-- Always try high priority first, then fall back to low priority
atomically $ do
mHigh <- tryReadTQueue highPriorityQueue
case mHigh of
Just item -> pure item
Nothing -> readTQueue lowPriorityQueue
, isEmpty = do
atomically $ do
n <- readTVar numThreads
isEmpty' <- isEmptyTQueue q
pure (isEmpty' && n == 0)
isHighEmpty <- isEmptyTQueue highPriorityQueue
isLowEmpty <- isEmptyTQueue lowPriorityQueue
pure (isHighEmpty && isLowEmpty && n == 0)
}
10 changes: 6 additions & 4 deletions hydra-node/test/Hydra/BehaviorSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Hydra.Chain.Direct.Handlers (LocalChainState, getLatest, newLocalChainSta
import Hydra.Events (EventSink (..))
import Hydra.Events.Rotation (EventStore (..))
import Hydra.HeadLogic (CoordinatedHeadState (..), Effect (..), HeadState (..), InitialState (..), Input (..), OpenState (..))
import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority)
import Hydra.HeadLogicSpec (testSnapshot)
import Hydra.Ledger (Ledger, nextChainSlot)
import Hydra.Ledger.Simple (SimpleChainState (..), SimpleTx (..), aValidTx, simpleLedger, utxoRef, utxoRefs)
Expand Down Expand Up @@ -1188,7 +1189,7 @@ simulatedChainAndNetwork initialChainState = do
recordAndYieldEvent nodes history ev

handleChainEvent :: HydraNode tx m -> ChainEvent tx -> m ()
handleChainEvent HydraNode{inputQueue} = enqueue inputQueue . ChainInput
handleChainEvent HydraNode{inputQueue} = enqueue inputQueue HighPriority . ChainInput

createMockNetwork :: MonadSTM m => DraftHydraNode tx m -> TVar m [HydraNode tx m] -> Network m (Message tx)
createMockNetwork node nodes =
Expand All @@ -1199,7 +1200,8 @@ createMockNetwork node nodes =
mapM_ (`handleMessage` msg) allNodes

handleMessage HydraNode{inputQueue} msg =
enqueue inputQueue $ mkNetworkInput sender msg
let input = mkNetworkInput sender msg
in enqueue inputQueue (inputPriority input) input

sender = getParty node

Expand Down Expand Up @@ -1312,10 +1314,10 @@ createTestHydraClient ::
TestHydraClient tx m
createTestHydraClient outputs messages outputHistory HydraNode{inputQueue, nodeStateHandler} =
TestHydraClient
{ send = enqueue inputQueue . ClientInput
{ send = enqueue inputQueue HighPriority . ClientInput
, waitForNext = atomically (readTQueue outputs)
, waitForNextMessage = atomically (readTQueue messages)
, injectChainEvent = enqueue inputQueue . ChainInput
, injectChainEvent = enqueue inputQueue HighPriority . ChainInput
, serverOutputs = reverse <$> readTVarIO outputHistory
, queryState = atomically (queryNodeState nodeStateHandler)
}
Expand Down
6 changes: 4 additions & 2 deletions hydra-node/test/Hydra/Model/MockChain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import Hydra.HeadLogic (
Input (..),
OpenState (..),
)
import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority)
import Hydra.Ledger (Ledger (..), ValidationError (..), collectTransactions)
import Hydra.Ledger.Cardano (adjustUTxO, fromChainSlot)
import Hydra.Ledger.Cardano.Evaluate (eraHistoryWithoutHorizon, evaluateTx, renderEvaluationReport)
Expand Down Expand Up @@ -190,7 +191,7 @@ mockChainAndNetwork tr seedKeys commits = do
, chainHandler =
chainSyncHandler
tr
(enqueue . ChainInput)
(enqueue HighPriority . ChainInput)
getTimeHandle
ctx
localChainState
Expand Down Expand Up @@ -376,7 +377,8 @@ createMockNetwork draftNode nodes =
mapM_ (`handleMessage` msg) allNodes

handleMessage HydraNode{inputQueue} msg = do
enqueue inputQueue $ mkNetworkInput sender msg
let input = mkNetworkInput sender msg
enqueue inputQueue (inputPriority input) input

sender = getParty draftNode

Expand Down
3 changes: 2 additions & 1 deletion hydra-node/test/Hydra/Node/InputQueueSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Hydra.Node.InputQueueSpec where
import Hydra.Prelude

import Control.Monad.IOSim (IOSim, runSimOrThrow)
import Hydra.HeadLogic.Input (MessagePriority (..))
import Hydra.Node.InputQueue (Queued (queuedId), createInputQueue, dequeue, enqueue)
import Test.Hspec (Spec)
import Test.Hspec.QuickCheck (prop)
Expand All @@ -22,7 +23,7 @@ prop_identify_enqueued_items (NonEmpty inputs) =
test = do
q <- createInputQueue
forM inputs $ \i -> do
enqueue q i
enqueue q HighPriority i
queuedId <$> dequeue q
ids = runSimOrThrow test
in isContinuous ids
Expand Down
3 changes: 2 additions & 1 deletion hydra-node/test/Hydra/NodeSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Hydra.Chain.ChainState (ChainSlot (ChainSlot), IsChainState)
import Hydra.Events (EventSink (..), EventSource (..), getEventId)
import Hydra.Events.Rotation (EventStore (..), LogId)
import Hydra.HeadLogic (Input (..), TTL)
import Hydra.HeadLogic.Input (inputPriority)
import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged)
import Hydra.HeadLogic.StateEvent (StateEvent (..), genStateEvent)
import Hydra.HeadLogicSpec (inInitialState, receiveMessage, receiveMessageFrom, testSnapshot)
Expand Down Expand Up @@ -342,7 +343,7 @@ primeWith inputs node@HydraNode{inputQueue = InputQueue{enqueue}, nodeStateHandl
now <- getCurrentTime
chainSlot <- currentSlot <$> atomically queryNodeState
let tick = ChainInput $ Tick now (chainSlot + 1)
forM_ (tick : inputs) enqueue
forM_ (tick : inputs) $ \input -> enqueue (inputPriority input) input
pure node

-- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations.
Expand Down