Skip to content

Commit 27d0cbb

Browse files
amesgentbagrel1
andcommitted
Adapt to improved ObjectDiffusion inbound interface
Co-authored-by: Thomas BAGREL <thomas.bagrel@tweag.io>
1 parent a909419 commit 27d0cbb

File tree

2 files changed

+33
-30
lines changed
  • ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion

2 files changed

+33
-30
lines changed

cabal.project

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ allow-newer:
5959
source-repository-package
6060
type: git
6161
location: https://github.com/IntersectMBO/ouroboros-network
62-
tag: peras-staging/pr-5202
63-
--sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U=
62+
tag: peras-staging/pr-5202-v2
63+
--sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ=
6464
subdir:
6565
ouroboros-network
6666
ouroboros-network-protocols

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ objectDiffusionInbound
146146
_version
147147
controlMessageSTM
148148
state =
149-
ObjectDiffusionInboundPipelined $ do
149+
ObjectDiffusionInboundPipelined $
150150
continueWithStateM (go Zero) initialInboundSt
151151
where
152152
canRequestMoreObjects :: InboundSt k object -> Bool
@@ -320,9 +320,10 @@ objectDiffusionInbound
320320
-- request.
321321
let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
322322
poolHasObject <- atomically $ opwHasObject
323-
continueWithStateM
324-
(go n)
325-
(preAcknowledge st' poolHasObject collectedIds)
323+
pure $
324+
continueWithStateM
325+
(go n)
326+
(preAcknowledge st' poolHasObject collectedIds)
326327
CollectObjects requestedIds collectedObjects -> do
327328
let requestedIdsSet = Set.fromList requestedIds
328329
obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects)
@@ -368,15 +369,16 @@ objectDiffusionInbound
368369
traceWith tracer $
369370
TraceObjectDiffusionProcessed
370371
(NumObjectsProcessed (fromIntegral $ length objectsToAck))
371-
continueWithStateM
372-
(go n)
373-
st
374-
{ pendingObjects = pendingObjects''
375-
, outstandingFifo = outstandingFifo'
376-
, numToAckOnNextReq =
377-
numToAckOnNextReq st
378-
+ fromIntegral (Seq.length objectIdsToAck)
379-
}
372+
pure $
373+
continueWithStateM
374+
(go n)
375+
st
376+
{ pendingObjects = pendingObjects''
377+
, outstandingFifo = outstandingFifo'
378+
, numToAckOnNextReq =
379+
numToAckOnNextReq st
380+
+ fromIntegral (Seq.length objectIdsToAck)
381+
}
380382

381383
goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m
382384
goReqObjectIdsBlocking = Stateful $ \st -> do
@@ -392,20 +394,21 @@ objectDiffusionInbound
392394
$ SendMsgRequestObjectIdsBlocking
393395
(numToAckOnNextReq st)
394396
numIdsToRequest
395-
( \neCollectedIds -> do
397+
( \neCollectedIds -> WithEffect $ do
396398
-- We just got some new object id's, so we are no longer idling
397399
--
398400
-- NOTE this change of state should be made explicit:
399401
-- https://github.com/tweag/cardano-peras/issues/144
400402
Idling.idlingStop (odisvIdling state)
401403
traceWith tracer TraceObjectInboundStoppedIdling
402-
collectAndContinueWithState
403-
(goCollect Zero)
404-
st
405-
{ numToAckOnNextReq = 0
406-
, numIdsInFlight = numIdsToRequest
407-
}
408-
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
404+
pure $
405+
collectAndContinueWithState
406+
(goCollect Zero)
407+
st
408+
{ numToAckOnNextReq = 0
409+
, numIdsInFlight = numIdsToRequest
410+
}
411+
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
409412
)
410413

411414
goReqObjectsAndObjectIdsPipelined ::
@@ -433,7 +436,7 @@ objectDiffusionInbound
433436
let numIdsToRequest = numIdsToReq st
434437

435438
if numIdsToRequest <= 0
436-
then continueWithStateM (go n) st
439+
then pure $ continueWithStateM (go n) st
437440
else
438441
pure $
439442
SendMsgRequestObjectIdsPipelined
@@ -454,8 +457,8 @@ objectDiffusionInbound
454457
terminateAfterDrain ::
455458
Nat n -> InboundStIdle n objectId object m ()
456459
terminateAfterDrain = \case
457-
Zero -> SendMsgDone (pure ())
458-
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n
460+
Zero -> SendMsgDone ()
461+
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n
459462

460463
-------------------------------------------------------------------------------
461464
-- Utilities to deal with stateful continuations (copied from TX-submission)
@@ -487,9 +490,9 @@ continueWithStateM ::
487490
NoThunks s =>
488491
StatefulM s n objectId object m ->
489492
s ->
490-
m (InboundStIdle n objectId object m ())
493+
InboundStIdle n objectId object m ()
491494
continueWithStateM (StatefulM f) !st =
492-
checkInvariant (show <$> unsafeNoThunks st) (f st)
495+
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st)
493496
{-# NOINLINE continueWithStateM #-}
494497

495498
-- | A variant of 'continueWithState' to be more easily utilized with
@@ -499,7 +502,7 @@ collectAndContinueWithState ::
499502
StatefulCollect s n objectId object m ->
500503
s ->
501504
Collect objectId object ->
502-
m (InboundStIdle n objectId object m ())
505+
InboundStIdle n objectId object m ()
503506
collectAndContinueWithState (StatefulCollect f) !st c =
504-
checkInvariant (show <$> unsafeNoThunks st) (f st c)
507+
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c)
505508
{-# NOINLINE collectAndContinueWithState #-}

0 commit comments

Comments
 (0)