diff --git a/mempool/mempool.go b/mempool/mempool.go index 10aadf325..e19e3a189 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -290,6 +290,10 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme defer m.mtx.Unlock() ctx := sdk.UnwrapSDKContext(goCtx) + // Wait for the legacypool to Reset at >= blockHeight (this may have + // already happened), to ensure all txs in pending pool are valid. + m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight()) + evmIterator, cosmosIterator := m.getIterators(goCtx, i) combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 56f364446..1563bf94c 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -18,6 +18,7 @@ package legacypool import ( + "context" "errors" "maps" "math/big" @@ -255,13 +256,15 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop - initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone + wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed changesSinceReorg int // A counter for how many drops we've performed in-between reorg. @@ -282,22 +285,24 @@ func New(config Config, chain BlockChain) *LegacyPool { // Create the transaction pool with its initial settings pool := &LegacyPool{ - config: config, - chain: chain, - chainconfig: chain.Config(), - signer: types.LatestSigner(chain.Config()), - pending: make(map[common.Address]*list), - queue: make(map[common.Address]*list), - beats: make(map[common.Address]time.Time), - all: newLookup(), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - initDoneCh: make(chan struct{}), + config: config, + chain: chain, + chainconfig: chain.Config(), + signer: types.LatestSigner(chain.Config()), + pending: make(map[common.Address]*list), + queue: make(map[common.Address]*list), + beats: make(map[common.Address]time.Time), + all: newLookup(), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + reorgSubscriptionCh: make(chan struct{}), + initDoneCh: make(chan struct{}), } pool.priced = newPricedList(pool.all) + pool.latestReorgHeight.Store(0) return pool } @@ -1262,7 +1267,8 @@ func (pool *LegacyPool) scheduleReorgLoop() { queuedEvents[addr] = NewSortedMap() } queuedEvents[addr].Put(tx) - + case <-pool.reorgSubscriptionCh: + pool.reorgDoneCh <- nextDone case <-curDone: curDone = nil @@ -1342,6 +1348,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg)) pool.changesSinceReorg = 0 // Reset change counter + if reset != nil && reset.newHead != nil { + pool.latestReorgHeight.Store(reset.newHead.Number.Int64()) + } pool.mu.Unlock() // Notify subsystems for newly added transactions @@ -1963,6 +1972,43 @@ func (pool *LegacyPool) Clear() { pool.pendingNonces = newNoncer(pool.currentState) } +// WaitForReorgHeight blocks until the reorg loop has reset at a head with +// height >= height. If the context is cancelled or the pool is shutting down, +// this will also return. +func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) { + for pool.latestReorgHeight.Load() < height { + // reorg loop has not run at the target height, subscribe to the + // outcome of the next reorg loop iteration to know when to check again + sub, err := pool.SubscribeToNextReorg() + if err != nil { + return + } + + // need to check again in case reorg has finished in between initial + // check and subscribing to next reorg + if pool.latestReorgHeight.Load() >= height { + return + } + + select { + case <-sub: + case <-ctx.Done(): + return + } + } +} + +// SubscribeToNextReorg returns a channel that will close when the next reorg +// loop completes. An error is returned if the loop is shutting down. +func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) { + select { + case pool.reorgSubscriptionCh <- struct{}{}: + return <-pool.reorgDoneCh, nil + case <-pool.reorgShutdownCh: + return nil, errors.New("shutdown") + } +} + // HasPendingAuth returns a flag indicating whether there are pending // authorizations from the specific address cached in the pool. func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool { diff --git a/mempool/txpool/legacypool/legacypool_test.go b/mempool/txpool/legacypool/legacypool_test.go index 0ae483cfe..d2ca8751e 100644 --- a/mempool/txpool/legacypool/legacypool_test.go +++ b/mempool/txpool/legacypool/legacypool_test.go @@ -17,6 +17,7 @@ package legacypool import ( + "context" "crypto/ecdsa" crand "crypto/rand" "errors" @@ -2668,6 +2669,146 @@ func TestRemoveTxTruncatePoolRace(t *testing.T) { wg.Wait() } +// TestWaitForReorgHeight tests that WaitForReorgHeight properly blocks until +// the reorg loop has completed for the specified height. +func TestWaitForReorgHeight(t *testing.T) { + t.Run("waits for reorg to complete", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + if pool.latestReorgHeight.Load() != 0 { + t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load()) + } + + // Create headers for the reset + oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(5), BaseFee: big.NewInt(10)} + + var reorgCompleted atomic.Bool + var waitCompleted atomic.Bool + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + pool.WaitForReorgHeight(ctx, 5) + waitCompleted.Store(true) + }() + + // Give the waiter a chance to subscribe + time.Sleep(50 * time.Millisecond) + + wg.Add(1) + go func() { + pool.Reset(oldHead, newHead) + reorgCompleted.Store(true) + wg.Done() + }() + + // Wait for waiters + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-waitChan: + case <-time.After(time.Second): + t.Fatal("timeout waiting for waiters") + } + + if pool.latestReorgHeight.Load() != newHead.Number.Int64() { + t.Errorf("expected height 5 after reorg, got %d", pool.latestReorgHeight.Load()) + } + if !reorgCompleted.Load() { + t.Errorf("WaitForReorgHeight returned before reorg completed") + } + }) + + t.Run("multiple height wait", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + if pool.latestReorgHeight.Load() != 0 { + t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load()) + } + + var waitCompleted atomic.Bool + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + pool.WaitForReorgHeight(ctx, 10) + waitCompleted.Store(true) + }() + + // Give the waiter a chance to subscribe + time.Sleep(50 * time.Millisecond) + + go func() { + for i := 0; i < 20; i++ { + oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)} + pool.Reset(oldHead, newHead) + } + }() + + // Wait for waiters + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + + select { + case <-waitChan: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for waiters") + } + + if pool.latestReorgHeight.Load() < 10 { + t.Errorf("expected height >= 10 after reorg, got %d", pool.latestReorgHeight.Load()) + } + }) + + t.Run("concurrent waiters", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + pool.WaitForReorgHeight(context.Background(), 7) + }(i) + } + + // Give all waiters time to subscribe + time.Sleep(100 * time.Millisecond) + + // Trigger a single reorg + oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(7), BaseFee: big.NewInt(10)} + pool.Reset(oldHead, newHead) + + // Wait for all waiters to complete + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-waitChan: + case <-time.After(2 * time.Second): + t.Errorf("not all waiters completed in 2 seconds") + } + }) +} + // TestPromoteExecutablesRecheckTx tests that promoteExecutables properly removes // a transaction from all pools if it fails the RecheckTxFn. func TestPromoteExecutablesRecheckTx(t *testing.T) {