From c9beb5cfd2107f2e929f0250569959b42851e378 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Wed, 26 Nov 2025 16:58:01 -0500 Subject: [PATCH 1/7] block mempool.Select on reorg completion at height --- mempool/mempool.go | 4 ++ mempool/txpool/legacypool/legacypool.go | 92 +++++++++++++++++++------ 2 files changed, 74 insertions(+), 22 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 532631ed9..8e4e64454 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -287,6 +287,10 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme defer m.mtx.Unlock() ctx := sdk.UnwrapSDKContext(goCtx) + // Wait for the legacypool to Reset at >= blockHeight (this may have + // already happened), to ensure + m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight()) + evmIterator, cosmosIterator := m.getIterators(goCtx, i) combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 7139a3b9d..051558e3e 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -18,6 +18,7 @@ package legacypool import ( + "context" "errors" "maps" "math/big" @@ -245,13 +246,15 @@ type LegacyPool struct { all *lookup // All transactions to allow lookups priced *pricedList // All transactions sorted by price - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop - initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone + wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed changesSinceReorg int // A counter for how many drops we've performed in-between reorg. @@ -270,22 +273,24 @@ func New(config Config, chain BlockChain) *LegacyPool { // Create the transaction pool with its initial settings pool := &LegacyPool{ - config: config, - chain: chain, - chainconfig: chain.Config(), - signer: types.LatestSigner(chain.Config()), - pending: make(map[common.Address]*list), - queue: make(map[common.Address]*list), - beats: make(map[common.Address]time.Time), - all: newLookup(), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - initDoneCh: make(chan struct{}), + config: config, + chain: chain, + chainconfig: chain.Config(), + signer: types.LatestSigner(chain.Config()), + pending: make(map[common.Address]*list), + queue: make(map[common.Address]*list), + beats: make(map[common.Address]time.Time), + all: newLookup(), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + reorgSubscriptionCh: make(chan struct{}), + initDoneCh: make(chan struct{}), } pool.priced = newPricedList(pool.all) + pool.latestReorgHeight.Store(0) return pool } @@ -1213,6 +1218,10 @@ func (pool *LegacyPool) scheduleReorgLoop() { // Run the background reorg and announcements go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) + if reset != nil && reset.newHead != nil { + // Update counter to new latest reorg reset height + } + // Prepare everything for the next round of reorg curDone, nextDone = nextDone, make(chan struct{}) launchNextRun = false @@ -1250,7 +1259,8 @@ func (pool *LegacyPool) scheduleReorgLoop() { queuedEvents[addr] = NewSortedMap() } queuedEvents[addr].Put(tx) - + case <-pool.reorgSubscriptionCh: + pool.reorgDoneCh <- nextDone case <-curDone: curDone = nil @@ -1269,6 +1279,9 @@ func (pool *LegacyPool) scheduleReorgLoop() { func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) { defer func(t0 time.Time) { reorgDurationTimer.Update(time.Since(t0)) + if reset != nil && reset.newHead != nil { + pool.latestReorgHeight.Store(reset.newHead.Number.Int64()) + } }(time.Now()) defer close(done) @@ -1895,6 +1908,41 @@ func (pool *LegacyPool) Clear() { pool.pendingNonces = newNoncer(pool.currentState) } +// WaitForReorgHeight blocks until the reorg loop has reset at a head with +// height >= height. If the context is cancelled or the pool is shutting down, +// this will also return. +func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) { + for pool.latestReorgHeight.Load() < height { + sub, err := pool.SubscribeToNextReorg() + if err != nil { + return + } + + // need to check again in case reorg has finished in between initial + // check and subscribing to next reorg + if pool.latestReorgHeight.Load() >= height { + return + } + + select { + case <-sub: + case <-ctx.Done(): + return + } + } +} + +// SubscribeToNextReorg returns a channel that will close when the next reorg +// loop completes. An error is returned if the loop is shutting down. +func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) { + select { + case pool.reorgSubscriptionCh <- struct{}{}: + return <-pool.reorgDoneCh, nil + case <-pool.reorgShutdownCh: + return nil, errors.New("shutdown") + } +} + // HasPendingAuth returns a flag indicating whether there are pending // authorizations from the specific address cached in the pool. func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool { From df1fb75ac7b656910783fbb5012da978e57b4ecf Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Thu, 27 Nov 2025 17:04:56 -0500 Subject: [PATCH 2/7] launch next run on subscribe and move updating latest height inside lock --- mempool/txpool/legacypool/legacypool.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 051558e3e..5c1a01adc 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -1218,10 +1218,6 @@ func (pool *LegacyPool) scheduleReorgLoop() { // Run the background reorg and announcements go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) - if reset != nil && reset.newHead != nil { - // Update counter to new latest reorg reset height - } - // Prepare everything for the next round of reorg curDone, nextDone = nextDone, make(chan struct{}) launchNextRun = false @@ -1260,6 +1256,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { } queuedEvents[addr].Put(tx) case <-pool.reorgSubscriptionCh: + launchNextRun = true pool.reorgDoneCh <- nextDone case <-curDone: curDone = nil @@ -1279,9 +1276,6 @@ func (pool *LegacyPool) scheduleReorgLoop() { func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) { defer func(t0 time.Time) { reorgDurationTimer.Update(time.Since(t0)) - if reset != nil && reset.newHead != nil { - pool.latestReorgHeight.Store(reset.newHead.Number.Int64()) - } }(time.Now()) defer close(done) @@ -1343,6 +1337,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg)) pool.changesSinceReorg = 0 // Reset change counter + if reset != nil && reset.newHead != nil { + pool.latestReorgHeight.Store(reset.newHead.Number.Int64()) + } pool.mu.Unlock() // Notify subsystems for newly added transactions From c701d97d4155c34ca10f4acb2e4cc26769eb7b48 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Thu, 27 Nov 2025 17:06:27 -0500 Subject: [PATCH 3/7] wait for latest reorg height tests --- mempool/txpool/legacypool/legacypool_test.go | 149 +++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/mempool/txpool/legacypool/legacypool_test.go b/mempool/txpool/legacypool/legacypool_test.go index 7e93fa5fb..f49e53744 100644 --- a/mempool/txpool/legacypool/legacypool_test.go +++ b/mempool/txpool/legacypool/legacypool_test.go @@ -17,6 +17,7 @@ package legacypool import ( + "context" "crypto/ecdsa" crand "crypto/rand" "errors" @@ -2668,6 +2669,154 @@ func TestRemoveTxTruncatePoolRace(t *testing.T) { wg.Wait() } +// TestWaitForReorgHeight tests that WaitForReorgHeight properly blocks until +// the reorg loop has completed for the specified height. +func TestWaitForReorgHeight(t *testing.T) { + t.Run("waits for reorg to complete", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + if pool.latestReorgHeight.Load() != 0 { + t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load()) + } + + // Create headers for the reset + oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(5), BaseFee: big.NewInt(10)} + + var reorgCompleted atomic.Bool + var waitCompleted atomic.Bool + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + pool.WaitForReorgHeight(ctx, 5) + waitCompleted.Store(true) + }() + + // Give the waiter a chance to subscribe + time.Sleep(50 * time.Millisecond) + + wg.Add(1) + go func() { + pool.Reset(oldHead, newHead) + reorgCompleted.Store(true) + wg.Done() + }() + + // Wait for waiters + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-waitChan: + case <-time.After(time.Second): + t.Fatal("timeout waiting for waiters") + } + + if pool.latestReorgHeight.Load() != newHead.Number.Int64() { + t.Errorf("expected height 5 after reorg, got %d", pool.latestReorgHeight.Load()) + } + if !reorgCompleted.Load() { + t.Errorf("WaitForReorgHeight returned before reorg completed") + } + }) + + t.Run("multiple height wait", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + if pool.latestReorgHeight.Load() != 0 { + t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load()) + } + + var reorgCompleted atomic.Bool + var waitCompleted atomic.Bool + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + pool.WaitForReorgHeight(ctx, 10) + waitCompleted.Store(true) + }() + + // Give the waiter a chance to subscribe + time.Sleep(50 * time.Millisecond) + + wg.Add(1) + go func() { + for i := 0; i < 10; i++ { + oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)} + pool.Reset(oldHead, newHead) + } + reorgCompleted.Store(true) + fmt.Println("all resets done") + wg.Done() + }() + + // Wait for waiters + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + + select { + case <-waitChan: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for waiters") + } + + if pool.latestReorgHeight.Load() != 10 { + t.Errorf("expected height 10 after reorg, got %d", pool.latestReorgHeight.Load()) + } + if !reorgCompleted.Load() { + t.Errorf("WaitForReorgHeight returned before reorg completed") + } + }) + + t.Run("concurrent waiters", func(t *testing.T) { + pool, _ := setupPool() + defer pool.Close() + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + pool.WaitForReorgHeight(context.Background(), 7) + }(i) + } + + // Give all waiters time to subscribe + time.Sleep(100 * time.Millisecond) + + // Trigger a single reorg + oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)} + newHead := &types.Header{Number: big.NewInt(7), BaseFee: big.NewInt(10)} + pool.Reset(oldHead, newHead) + + // Wait for all waiters to complete + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-waitChan: + case <-time.After(2 * time.Second): + t.Errorf("not all waiters completed in 2 seconds") + } + }) +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } From 4e47fd460b2d6aaf6812f4137750104720aadc13 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Fri, 28 Nov 2025 18:30:49 -0500 Subject: [PATCH 4/7] typo --- mempool/mempool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index e907becdc..e19e3a189 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -291,7 +291,7 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme ctx := sdk.UnwrapSDKContext(goCtx) // Wait for the legacypool to Reset at >= blockHeight (this may have - // already happened), to ensure + // already happened), to ensure all txs in pending pool are valid. m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight()) evmIterator, cosmosIterator := m.getIterators(goCtx, i) From d38f0dc5a39bb277239b5c4180b3d440600e4336 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Fri, 28 Nov 2025 18:34:16 -0500 Subject: [PATCH 5/7] comemnt --- mempool/txpool/legacypool/legacypool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 31f77b303..3bd8433c4 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -1978,6 +1978,8 @@ func (pool *LegacyPool) Clear() { // this will also return. func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) { for pool.latestReorgHeight.Load() < height { + // reorg loop has not run at the target height, subscribe to the + // outcome of the next reorg loop iteration to know when to check again sub, err := pool.SubscribeToNextReorg() if err != nil { return From 56d83159dce51d5391c60a4cd5e0907199ff2055 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Fri, 28 Nov 2025 18:36:22 -0500 Subject: [PATCH 6/7] remove launchNextRun = true during arbitrary subscriptions and fix test --- mempool/txpool/legacypool/legacypool.go | 1 - mempool/txpool/legacypool/legacypool_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index 3bd8433c4..1563bf94c 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -1268,7 +1268,6 @@ func (pool *LegacyPool) scheduleReorgLoop() { } queuedEvents[addr].Put(tx) case <-pool.reorgSubscriptionCh: - launchNextRun = true pool.reorgDoneCh <- nextDone case <-curDone: curDone = nil diff --git a/mempool/txpool/legacypool/legacypool_test.go b/mempool/txpool/legacypool/legacypool_test.go index c92caa287..50f61872f 100644 --- a/mempool/txpool/legacypool/legacypool_test.go +++ b/mempool/txpool/legacypool/legacypool_test.go @@ -2751,7 +2751,7 @@ func TestWaitForReorgHeight(t *testing.T) { wg.Add(1) go func() { - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)} newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)} pool.Reset(oldHead, newHead) From d2ba034e0609fbd27f30c3878688abae5f442092 Mon Sep 17 00:00:00 2001 From: Matt Acciai Date: Mon, 1 Dec 2025 12:15:41 -0500 Subject: [PATCH 7/7] fix test --- mempool/txpool/legacypool/legacypool_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/mempool/txpool/legacypool/legacypool_test.go b/mempool/txpool/legacypool/legacypool_test.go index 50f61872f..d2ca8751e 100644 --- a/mempool/txpool/legacypool/legacypool_test.go +++ b/mempool/txpool/legacypool/legacypool_test.go @@ -2734,7 +2734,6 @@ func TestWaitForReorgHeight(t *testing.T) { t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load()) } - var reorgCompleted atomic.Bool var waitCompleted atomic.Bool var wg sync.WaitGroup @@ -2749,16 +2748,12 @@ func TestWaitForReorgHeight(t *testing.T) { // Give the waiter a chance to subscribe time.Sleep(50 * time.Millisecond) - wg.Add(1) go func() { for i := 0; i < 20; i++ { oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)} newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)} pool.Reset(oldHead, newHead) } - reorgCompleted.Store(true) - fmt.Println("all resets done") - wg.Done() }() // Wait for waiters @@ -2774,11 +2769,8 @@ func TestWaitForReorgHeight(t *testing.T) { t.Fatal("timeout waiting for waiters") } - if pool.latestReorgHeight.Load() != 10 { - t.Errorf("expected height 10 after reorg, got %d", pool.latestReorgHeight.Load()) - } - if !reorgCompleted.Load() { - t.Errorf("WaitForReorgHeight returned before reorg completed") + if pool.latestReorgHeight.Load() < 10 { + t.Errorf("expected height >= 10 after reorg, got %d", pool.latestReorgHeight.Load()) } })