Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme
defer m.mtx.Unlock()
ctx := sdk.UnwrapSDKContext(goCtx)

// Wait for the legacypool to Reset at >= blockHeight (this may have
// already happened), to ensure all txs in pending pool are valid.
m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea to put a timeout into the PrepareProposalHandler such that an issue with the mempool won't block consensus? Or did you have some other way you anticipated handling long waits here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't planning on adding any app side timeout into prepare proposal or having a specific way of handling a stall here for now. My thought was just that if we have some issue here and the mempool isn't advancing, with our new design then the mempool needs to block consensus since we can't be sure that we wont propose a bad block if this condition isn't met. Instead we should just let Comet's timeout_propose run out and have a new proposer try.

We could have some maintain some kind of cursor as we validate txs in the pending pool via ante handlers and then have a timeout where if we are in the middle of validating and receive a timeout, then we only allow the txs that have been validated to be returned via select.

@swift1337 brought up a good idea during our sync this morning saying that we should add a limit to how many txs we validate during the reorg loop, since if we have a massive backlog in the pending/queued pool, then we may reach a point where we cannot validate fast enough in a single block time. We thought about adding a timeout here, but if we add a timeout without some way of then clearing out txs from the mempool, then we will just continue to timeout whenever we try to select. So I think the better solution is some sort of maxBytesValidated or maxGasValidated, similar to creating blocks (but we may also need the cursor kind of solution to know where the validation stopped).

Either way the solution is non trivial imo and we should probably just address in a new ticket later (im assuming this wont be a blocker for getting some initial numbers on how this performs).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah make sense. The failure case is probably the same as we currently have right now where if max bytes/gas is sufficiently high then you'll sit there in select for minutes even when the chain has timed out already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to benchmark some worst-case scenarios for the reorg loop based on the number of transactions and how many need to be moved/reordered and recommend some mempool parameters based on that. Do we know, for example, how much time validating 20k transactions would take?


evmIterator, cosmosIterator := m.getIterators(goCtx, i)

combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain)
Expand Down
90 changes: 68 additions & 22 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package legacypool

import (
"context"
"errors"
"maps"
"math/big"
Expand Down Expand Up @@ -255,13 +256,15 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

Expand All @@ -282,22 +285,24 @@ func New(config Config, chain BlockChain) *LegacyPool {

// Create the transaction pool with its initial settings
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgSubscriptionCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
}
pool.priced = newPricedList(pool.all)
pool.latestReorgHeight.Store(0)

return pool
}
Expand Down Expand Up @@ -1262,7 +1267,8 @@ func (pool *LegacyPool) scheduleReorgLoop() {
queuedEvents[addr] = NewSortedMap()
}
queuedEvents[addr].Put(tx)

case <-pool.reorgSubscriptionCh:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added ability for arbitrary subscriptions to the completion of nextDone (when the "next" reorg loop will complete). This works by a a user pushing a request to subscribe onto this channel, then they must immediately listed on the pool.ReorgDoneCh. This loop will push nextDone onto that channel. The user can then wait on the closure of nextDone, which essentially broadcasts to all holders of this channel (subscribers to the next run of the reorg loop) that is has completed.

pool.reorgDoneCh <- nextDone
case <-curDone:
curDone = nil

Expand Down Expand Up @@ -1342,6 +1348,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,

dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
if reset != nil && reset.newHead != nil {
pool.latestReorgHeight.Store(reset.newHead.Number.Int64())
}
pool.mu.Unlock()

// Notify subsystems for newly added transactions
Expand Down Expand Up @@ -1963,6 +1972,43 @@ func (pool *LegacyPool) Clear() {
pool.pendingNonces = newNoncer(pool.currentState)
}

// WaitForReorgHeight blocks until the reorg loop has reset at a head with
// height >= height. If the context is cancelled or the pool is shutting down,
// this will also return.
func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) {
for pool.latestReorgHeight.Load() < height {
// reorg loop has not run at the target height, subscribe to the
// outcome of the next reorg loop iteration to know when to check again
sub, err := pool.SubscribeToNextReorg()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have not run the reorg loop for the target height yet, we wait for the outcome of the next iteration of the loop. We are explicitly not telling the reorg loop to run here, since that would simply run it again, but not increment it to a new height (since we would need to pass the latest headers to it in order for that to happen). Also if we kick off a new run here and dont increment the latestReorgHeight, then we will continuously kick off new reorg loops until the txpool sees a new block and reorg runs on a new block, essentially doing lots of wasted work.

if err != nil {
return
}

// need to check again in case reorg has finished in between initial
// check and subscribing to next reorg
if pool.latestReorgHeight.Load() >= height {
return
}

select {
case <-sub:
case <-ctx.Done():
return
Comment on lines +1995 to +1996
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely a better way to handle this, but mempool.Select doesnt return an error, so if the context cancels during this call, then we potentially allow invalid txs to be selected, which will just be invalidated. If this happens we likely timeout propose. Not 100% sure what it means for the context to be cancelled here (is app shutting down?), based on what's actually happening panicing may be better. Likely this is a follow on to this, probably not important right now.

}
}
}

// SubscribeToNextReorg returns a channel that will close when the next reorg
// loop completes. An error is returned if the loop is shutting down.
func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this logic a bit confusing because we operate with three different channels here, but I assume this is due to the code being ported from geth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is geth code I am adding onto, and agree the channel/subscription logic of the reorg loop is super confusing, plus this just adds another path to it which I dont love. After staring at the pattern for a bit I actually dont mind it though, given its initial learning curve.

For my and others future reference since this wasn't immediately obvious to me... At a high level they are creating a way to broadcast to multiple subscribers that the reorg loop finished. You can't easily do this with a channel and sending a message on it saying that something finished, since only a single subscriber will pull the message off. You could count the number of subscribers and send that amount of messages, but if one of those subscribers dies then you have a hanging message in the channel. To get around this when someone subscribes, the reorg loop immediately sends the subscriber a channel on the reorgDoneCh (assuming here the subscriber then immediately waits to receive the channel), this received channel is now their subscription. The reorg loop also maintains a reference to this channel and will close this channel when the next iteration of it finishes. Closing a channel will cause all readers of the channel to get a nil value off of it, so all subscribers treat that as 'something finished'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the only way to get this channel from reorgDoneCh was to request for the reorg loop to be run by passing it a new block header or telling it to promote txs. Here we dont actually want to do this, and simply want to wait on the result of the next run, whenever that happens (since we need to wait for a run to happen for a new block, and we can't simply pass it a new block here). So I created the reorgSubscriptionCh that users can push an empty struct onto to tell the reorg loop that we want to subscribe to the next runs result, but dont actually want to start a new run.

select {
case pool.reorgSubscriptionCh <- struct{}{}:
return <-pool.reorgDoneCh, nil
case <-pool.reorgShutdownCh:
return nil, errors.New("shutdown")
}
}

// HasPendingAuth returns a flag indicating whether there are pending
// authorizations from the specific address cached in the pool.
func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool {
Expand Down
141 changes: 141 additions & 0 deletions mempool/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package legacypool

import (
"context"
"crypto/ecdsa"
crand "crypto/rand"
"errors"
Expand Down Expand Up @@ -2668,6 +2669,146 @@ func TestRemoveTxTruncatePoolRace(t *testing.T) {
wg.Wait()
}

// TestWaitForReorgHeight tests that WaitForReorgHeight properly blocks until
// the reorg loop has completed for the specified height.
func TestWaitForReorgHeight(t *testing.T) {
t.Run("waits for reorg to complete", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

if pool.latestReorgHeight.Load() != 0 {
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
}

// Create headers for the reset
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(5), BaseFee: big.NewInt(10)}

var reorgCompleted atomic.Bool
var waitCompleted atomic.Bool
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
pool.WaitForReorgHeight(ctx, 5)
waitCompleted.Store(true)
}()

// Give the waiter a chance to subscribe
time.Sleep(50 * time.Millisecond)

wg.Add(1)
go func() {
pool.Reset(oldHead, newHead)
reorgCompleted.Store(true)
wg.Done()
}()

// Wait for waiters
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-waitChan:
case <-time.After(time.Second):
t.Fatal("timeout waiting for waiters")
}

if pool.latestReorgHeight.Load() != newHead.Number.Int64() {
t.Errorf("expected height 5 after reorg, got %d", pool.latestReorgHeight.Load())
}
if !reorgCompleted.Load() {
t.Errorf("WaitForReorgHeight returned before reorg completed")
}
})

t.Run("multiple height wait", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

if pool.latestReorgHeight.Load() != 0 {
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
}

var waitCompleted atomic.Bool
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
pool.WaitForReorgHeight(ctx, 10)
waitCompleted.Store(true)
}()

// Give the waiter a chance to subscribe
time.Sleep(50 * time.Millisecond)

go func() {
for i := 0; i < 20; i++ {
oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)}
pool.Reset(oldHead, newHead)
}
}()

// Wait for waiters
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()

select {
case <-waitChan:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for waiters")
}

if pool.latestReorgHeight.Load() < 10 {
t.Errorf("expected height >= 10 after reorg, got %d", pool.latestReorgHeight.Load())
}
})

t.Run("concurrent waiters", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
pool.WaitForReorgHeight(context.Background(), 7)
}(i)
}

// Give all waiters time to subscribe
time.Sleep(100 * time.Millisecond)

// Trigger a single reorg
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(7), BaseFee: big.NewInt(10)}
pool.Reset(oldHead, newHead)

// Wait for all waiters to complete
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-waitChan:
case <-time.After(2 * time.Second):
t.Errorf("not all waiters completed in 2 seconds")
}
})
}

// TestPromoteExecutablesRecheckTx tests that promoteExecutables properly removes
// a transaction from all pools if it fails the RecheckTxFn.
func TestPromoteExecutablesRecheckTx(t *testing.T) {
Expand Down
Loading