diff --git a/.github/actions/nix-cachix-setup/action.yml b/.github/actions/nix-cachix-setup/action.yml index fcf19333bb5..6c24c24a284 100644 --- a/.github/actions/nix-cachix-setup/action.yml +++ b/.github/actions/nix-cachix-setup/action.yml @@ -9,6 +9,21 @@ runs: using: composite steps: + - name: 🧹 Free disk space + if: runner.os == 'Linux' + shell: bash + run: | + echo "Disk space before cleanup:" + df -h / + # Remove unnecessary tools to free up disk space + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + sudo docker image prune --all --force || true + echo "Disk space after cleanup:" + df -h / + - name: ❄ Prepare nix uses: cachix/install-nix-action@v30 with: diff --git a/hydra-node/src/Hydra/HeadLogic/Input.hs b/hydra-node/src/Hydra/HeadLogic/Input.hs index 14291998e57..f212af24c16 100644 --- a/hydra-node/src/Hydra/HeadLogic/Input.hs +++ b/hydra-node/src/Hydra/HeadLogic/Input.hs @@ -1,17 +1,46 @@ {-# LANGUAGE UndecidableInstances #-} -module Hydra.HeadLogic.Input where +module Hydra.HeadLogic.Input ( + Input (..), + TTL, + MessagePriority (..), + inputPriority, +) where import Hydra.Prelude import Hydra.API.ClientInput (ClientInput) import Hydra.Chain (ChainEvent) import Hydra.Chain.ChainState (IsChainState) -import Hydra.Network.Message (Message, NetworkEvent) +import Hydra.Network.Message (Message (..), NetworkEvent (..)) import Hydra.Tx.IsTx (ArbitraryIsTx) type TTL = Natural +-- | Priority level for input messages. Protocol messages (ReqSn, AckSn) get +-- high priority to prevent them from being delayed by transaction messages +-- under high load. +data MessagePriority = HighPriority | LowPriority + deriving stock (Eq, Show, Generic) + +-- | Classify an input by its priority. Protocol messages that are critical +-- for snapshot progress get high priority, while transaction submissions +-- get low priority. +inputPriority :: Input tx -> MessagePriority +inputPriority = \case + -- Protocol messages: high priority to ensure snapshot progress + NetworkInput{networkEvent = ReceivedMessage{msg = ReqSn{}}} -> HighPriority + NetworkInput{networkEvent = ReceivedMessage{msg = AckSn{}}} -> HighPriority + -- Connectivity events: high priority for protocol health + NetworkInput{networkEvent = ConnectivityEvent{}} -> HighPriority + -- Transaction requests: low priority (can be delayed under load) + NetworkInput{networkEvent = ReceivedMessage{msg = ReqTx{}}} -> LowPriority + NetworkInput{networkEvent = ReceivedMessage{msg = ReqDec{}}} -> LowPriority + -- Client inputs: high priority (user-initiated actions) + ClientInput{} -> HighPriority + -- Chain events: high priority (must be processed promptly) + ChainInput{} -> HighPriority + -- | Inputs that are processed by the head logic (the "core"). Corresponding to -- each of the "shell" layers, we distinguish between inputs from the client, -- the network and the chain. diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index 6e2fd1a9cd0..6e0b38eef17 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -43,6 +43,7 @@ import Hydra.HeadLogic ( aggregateState, ) import Hydra.HeadLogic qualified as HeadLogic +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) import Hydra.HeadLogic.Outcome (StateChanged (..)) import Hydra.HeadLogic.State (getHeadParameters) import Hydra.HeadLogic.StateEvent (StateEvent (..)) @@ -226,12 +227,12 @@ hydrate tracer env ledger initialChainState EventStore{eventSource, eventSink} e ) wireChainInput :: DraftHydraNode tx m -> (ChainEvent tx -> m ()) -wireChainInput node = enqueue . ChainInput +wireChainInput node = enqueue HighPriority . ChainInput where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node wireClientInput :: DraftHydraNode tx m -> (ClientInput tx -> m ()) -wireClientInput node = enqueue . ClientInput +wireClientInput node = enqueue HighPriority . ClientInput where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node @@ -239,9 +240,11 @@ wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (Authenticated (Messa wireNetworkInput node = NetworkCallback { deliver = \Authenticated{party = sender, payload = msg} -> - enqueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + in enqueue (inputPriority input) input , onConnectivity = - enqueue . NetworkInput 1 . ConnectivityEvent + let input = NetworkInput 1 . ConnectivityEvent + in enqueue HighPriority . input } where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node @@ -322,7 +325,9 @@ stepHydraNode node = do maybeReenqueue q@Queued{queuedId, queuedItem} = case queuedItem of NetworkInput ttl msg - | ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg} + | ttl > 0 -> + let newItem = NetworkInput (ttl - 1) msg + in reenqueue (inputPriority newItem) waitDelay q{queuedItem = newItem} _ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem} Environment{party} = env @@ -391,7 +396,7 @@ processEffects node tracer inputId effects = do OnChainEffect{postChainTx} -> postTx postChainTx `catch` \(postTxError :: PostTxError tx) -> - enqueue . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing} + enqueue HighPriority . ChainInput $ PostTxError{postChainTx, postTxError, failingTx = Nothing} traceWith tracer $ EndEffect party inputId effectId HydraNode diff --git a/hydra-node/src/Hydra/Node/InputQueue.hs b/hydra-node/src/Hydra/Node/InputQueue.hs index 20543e59054..52bcb27fac0 100644 --- a/hydra-node/src/Hydra/Node/InputQueue.hs +++ b/hydra-node/src/Hydra/Node/InputQueue.hs @@ -1,4 +1,6 @@ -- | The general input queue from which the Hydra head is fed with inputs. +-- This implementation uses a priority queue system to ensure protocol messages +-- (ReqSn, AckSn) are processed before transaction messages under high load. module Hydra.Node.InputQueue where import Hydra.Prelude @@ -7,22 +9,31 @@ import Control.Concurrent.Class.MonadSTM ( isEmptyTQueue, modifyTVar', readTQueue, + tryReadTQueue, writeTQueue, ) +import Hydra.HeadLogic.Input (MessagePriority (..)) --- | The single, required queue in the system from which a hydra head is "fed". +-- | The input queue system with priority support. High priority messages +-- (protocol messages like ReqSn, AckSn) are processed before low priority +-- messages (transaction requests) to ensure snapshot progress under high load. +-- -- NOTE(SN): this probably should be bounded and include proper logging -- NOTE(SN): handle pattern, but likely not required as there is no need for an -- alternative implementation data InputQueue m e = InputQueue - { enqueue :: e -> m () - , reenqueue :: DiffTime -> Queued e -> m () + { enqueue :: MessagePriority -> e -> m () + , reenqueue :: MessagePriority -> DiffTime -> Queued e -> m () , dequeue :: m (Queued e) , isEmpty :: m Bool } data Queued a = Queued {queuedId :: Word64, queuedItem :: a} +-- | Create an input queue with priority support. The queue maintains two +-- internal queues: one for high priority messages (protocol) and one for +-- low priority messages (transactions). Dequeue always tries high priority +-- first before falling back to low priority. createInputQueue :: ( MonadDelay m , MonadAsync m @@ -31,27 +42,40 @@ createInputQueue :: m (InputQueue m e) createInputQueue = do numThreads <- newLabelledTVarIO "num-threads" (0 :: Integer) - nextId <- newLabelledTVarIO "nex-id" 0 - q <- newLabelledTQueueIO "input-queue" + nextId <- newLabelledTVarIO "next-id" 0 + -- Two separate queues for priority handling + highPriorityQueue <- newLabelledTQueueIO "input-queue-high" + lowPriorityQueue <- newLabelledTQueueIO "input-queue-low" pure InputQueue - { enqueue = \queuedItem -> + { enqueue = \priority queuedItem -> atomically $ do queuedId <- readTVar nextId - writeTQueue q Queued{queuedId, queuedItem} + let queued = Queued{queuedId, queuedItem} + case priority of + HighPriority -> writeTQueue highPriorityQueue queued + LowPriority -> writeTQueue lowPriorityQueue queued modifyTVar' nextId succ - , reenqueue = \delay e -> do + , reenqueue = \priority delay e -> do atomically $ modifyTVar' numThreads succ void . asyncLabelled "input-queue-reenqueue" $ do threadDelay delay atomically $ do modifyTVar' numThreads pred - writeTQueue q e + case priority of + HighPriority -> writeTQueue highPriorityQueue e + LowPriority -> writeTQueue lowPriorityQueue e , dequeue = - atomically $ readTQueue q + -- Always try high priority first, then fall back to low priority + atomically $ do + mHigh <- tryReadTQueue highPriorityQueue + case mHigh of + Just item -> pure item + Nothing -> readTQueue lowPriorityQueue , isEmpty = do atomically $ do n <- readTVar numThreads - isEmpty' <- isEmptyTQueue q - pure (isEmpty' && n == 0) + isHighEmpty <- isEmptyTQueue highPriorityQueue + isLowEmpty <- isEmptyTQueue lowPriorityQueue + pure (isHighEmpty && isLowEmpty && n == 0) } diff --git a/hydra-node/test/Hydra/BehaviorSpec.hs b/hydra-node/test/Hydra/BehaviorSpec.hs index 46d05cc1f69..ef96e3bfe0c 100644 --- a/hydra-node/test/Hydra/BehaviorSpec.hs +++ b/hydra-node/test/Hydra/BehaviorSpec.hs @@ -35,6 +35,7 @@ import Hydra.Chain.Direct.Handlers (LocalChainState, getLatest, newLocalChainSta import Hydra.Events (EventSink (..)) import Hydra.Events.Rotation (EventStore (..)) import Hydra.HeadLogic (CoordinatedHeadState (..), Effect (..), HeadState (..), InitialState (..), Input (..), OpenState (..)) +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) import Hydra.HeadLogicSpec (testSnapshot) import Hydra.Ledger (Ledger, nextChainSlot) import Hydra.Ledger.Simple (SimpleChainState (..), SimpleTx (..), aValidTx, simpleLedger, utxoRef, utxoRefs) @@ -1188,7 +1189,7 @@ simulatedChainAndNetwork initialChainState = do recordAndYieldEvent nodes history ev handleChainEvent :: HydraNode tx m -> ChainEvent tx -> m () -handleChainEvent HydraNode{inputQueue} = enqueue inputQueue . ChainInput +handleChainEvent HydraNode{inputQueue} = enqueue inputQueue HighPriority . ChainInput createMockNetwork :: MonadSTM m => DraftHydraNode tx m -> TVar m [HydraNode tx m] -> Network m (Message tx) createMockNetwork node nodes = @@ -1199,7 +1200,8 @@ createMockNetwork node nodes = mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = - enqueue inputQueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + in enqueue inputQueue (inputPriority input) input sender = getParty node @@ -1312,10 +1314,10 @@ createTestHydraClient :: TestHydraClient tx m createTestHydraClient outputs messages outputHistory HydraNode{inputQueue, nodeStateHandler} = TestHydraClient - { send = enqueue inputQueue . ClientInput + { send = enqueue inputQueue HighPriority . ClientInput , waitForNext = atomically (readTQueue outputs) , waitForNextMessage = atomically (readTQueue messages) - , injectChainEvent = enqueue inputQueue . ChainInput + , injectChainEvent = enqueue inputQueue HighPriority . ChainInput , serverOutputs = reverse <$> readTVarIO outputHistory , queryState = atomically (queryNodeState nodeStateHandler) } diff --git a/hydra-node/test/Hydra/Model/MockChain.hs b/hydra-node/test/Hydra/Model/MockChain.hs index bbb9ea0de54..21b0b28f542 100644 --- a/hydra-node/test/Hydra/Model/MockChain.hs +++ b/hydra-node/test/Hydra/Model/MockChain.hs @@ -58,6 +58,7 @@ import Hydra.HeadLogic ( Input (..), OpenState (..), ) +import Hydra.HeadLogic.Input (MessagePriority (..), inputPriority) import Hydra.Ledger (Ledger (..), ValidationError (..), collectTransactions) import Hydra.Ledger.Cardano (adjustUTxO, fromChainSlot) import Hydra.Ledger.Cardano.Evaluate (eraHistoryWithoutHorizon, evaluateTx, renderEvaluationReport) @@ -190,7 +191,7 @@ mockChainAndNetwork tr seedKeys commits = do , chainHandler = chainSyncHandler tr - (enqueue . ChainInput) + (enqueue HighPriority . ChainInput) getTimeHandle ctx localChainState @@ -376,7 +377,8 @@ createMockNetwork draftNode nodes = mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = do - enqueue inputQueue $ mkNetworkInput sender msg + let input = mkNetworkInput sender msg + enqueue inputQueue (inputPriority input) input sender = getParty draftNode diff --git a/hydra-node/test/Hydra/Node/InputQueueSpec.hs b/hydra-node/test/Hydra/Node/InputQueueSpec.hs index 74ec0b096dd..47c00ef0eef 100644 --- a/hydra-node/test/Hydra/Node/InputQueueSpec.hs +++ b/hydra-node/test/Hydra/Node/InputQueueSpec.hs @@ -3,6 +3,7 @@ module Hydra.Node.InputQueueSpec where import Hydra.Prelude import Control.Monad.IOSim (IOSim, runSimOrThrow) +import Hydra.HeadLogic.Input (MessagePriority (..)) import Hydra.Node.InputQueue (Queued (queuedId), createInputQueue, dequeue, enqueue) import Test.Hspec (Spec) import Test.Hspec.QuickCheck (prop) @@ -22,7 +23,7 @@ prop_identify_enqueued_items (NonEmpty inputs) = test = do q <- createInputQueue forM inputs $ \i -> do - enqueue q i + enqueue q HighPriority i queuedId <$> dequeue q ids = runSimOrThrow test in isContinuous ids diff --git a/hydra-node/test/Hydra/NodeSpec.hs b/hydra-node/test/Hydra/NodeSpec.hs index 6dae3f05f8e..03627edafa5 100644 --- a/hydra-node/test/Hydra/NodeSpec.hs +++ b/hydra-node/test/Hydra/NodeSpec.hs @@ -16,6 +16,7 @@ import Hydra.Chain.ChainState (ChainSlot (ChainSlot), IsChainState) import Hydra.Events (EventSink (..), EventSource (..), getEventId) import Hydra.Events.Rotation (EventStore (..), LogId) import Hydra.HeadLogic (Input (..), TTL) +import Hydra.HeadLogic.Input (inputPriority) import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged) import Hydra.HeadLogic.StateEvent (StateEvent (..), genStateEvent) import Hydra.HeadLogicSpec (inInitialState, receiveMessage, receiveMessageFrom, testSnapshot) @@ -342,7 +343,7 @@ primeWith inputs node@HydraNode{inputQueue = InputQueue{enqueue}, nodeStateHandl now <- getCurrentTime chainSlot <- currentSlot <$> atomically queryNodeState let tick = ChainInput $ Tick now (chainSlot + 1) - forM_ (tick : inputs) enqueue + forM_ (tick : inputs) $ \input -> enqueue (inputPriority input) input pure node -- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations.