feat: LRU channel acquisition with per-channel claim locks#107
feat: LRU channel acquisition with per-channel claim locks#107
Conversation
WalkthroughThis PR replaces a global mutex-based pool locking mechanism with per-channel claim locks. It introduces lock-retention behavior on submission timeout with configurable cooldown support, refactors lock-check utilities to use KV list-keys operations, and adds LRU-based channel selection ordering for acquisition. Handler logic gains an uncertainOutcome flag to conditionally apply the new cooldown-based release behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker1 as Worker 1
participant Worker2 as Worker 2
participant Pool as ChannelPool
participant KV as KV Store
Note over Worker1,Worker2: New Acquire Flow
rect rgba(100, 150, 200, 0.5)
Note over Pool: Read Phase
Worker1->>KV: listKeys("testnet:channel:inuse:*")
KV-->>Worker1: unlocked channels
Worker1->>KV: get LRU map
KV-->>Worker1: LRU timestamps
end
rect rgba(150, 100, 200, 0.5)
Note over Pool: Per-Channel Claim Phase
Worker1->>KV: set claim lock on channel p1
KV-->>Worker1: success + token
Worker1->>KV: update LRU map
KV-->>Worker1: ack
end
Worker1->>Pool: acquire() returns p1
rect rgba(200, 150, 100, 0.5)
Note over Worker1: Submit with potential timeout
Worker1->>Worker1: submit transaction
Note over Worker1: WAIT_TIMEOUT or unknown status
Worker1->>Pool: releaseWithCooldown(lock, cooldownMs)
Pool->>KV: extend claim lock TTL
KV-->>Pool: ack
end
Worker2->>KV: listKeys() during cooldown
KV-->>Worker2: p1 still locked
Worker2->>Pool: acquire() returns different channel
sequenceDiagram
participant Handler as Handler
participant ChannelPool as ChannelPool
participant KV as KV Store
Note over Handler,KV: Handler Finalization Path
rect rgba(100, 200, 150, 0.5)
Note over Handler: Submit Result Processing
Handler->>Handler: check submit result status
alt uncertainOutcome (unknown/timeout)
Handler->>Handler: set uncertainOutcome = true
Handler->>Handler: clear sequence
Handler->>ChannelPool: extendLock (on WAIT_TIMEOUT)
else known status
Handler->>Handler: uncertainOutcome = false
end
end
rect rgba(200, 100, 150, 0.5)
Note over Handler: Finalization Release
alt uncertainOutcome is true
Handler->>ChannelPool: releaseWithCooldown(poolLock)
ChannelPool->>KV: extend lock with cooldown TTL
else standard release
Handler->>ChannelPool: release(poolLock)
ChannelPool->>KV: remove lock immediately
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
This PR refactors channel acquisition to remove the global acquire mutex and replace it with per-channel claim locks plus LRU-based candidate selection, reducing contention at high channel counts. It also adds a “cooldown” release mode for uncertain outcomes and updates the test KV to better model atomic locking.
Changes:
- Replace global acquire mutex with a read-phase + per-channel claim lock acquisition flow, selecting channels in LRU order.
- Add
releaseWithCooldownand update the handler to use it for uncertain outcomes. - Expand/adjust tests and update
FakeKVto support atomicwithLockand prefixlistKeysbehavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/plugin/pool.ts |
Implements LRU-based selection and per-channel claim locks; adds cooldown release. |
src/plugin/constants.ts |
Introduces new pool timing constants for claim locks, retries, cooldown, and LRU map TTL. |
src/plugin/handler.ts |
Uses cooldown release for uncertain outcomes instead of immediate release. |
src/plugin/management.ts |
Switches lock scanning from N exists() calls to a single listKeys() scan. |
test/helpers/fakeKV.ts |
Updates withLock to be atomic for concurrency tests; adds prefix-filtered listKeys. |
test/pool.test.ts |
Adds LRU, cooldown, and concurrency/claim-lock behavior tests. |
test/pool.busy.test.ts |
Updates “busy mutex” tests to “busy claim locks” and adds scan/claim race test. |
test/management.test.ts |
Updates lock-check failure test to reflect listKeys-based scanning. |
test/handler.sequence-cache.test.ts |
Updates pool mock to include releaseWithCooldown and adjusts expectations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -67,13 +62,9 @@ export class ChannelPool { | |||
| }); | |||
There was a problem hiding this comment.
The pool-capacity diagnostics still use the reason string all_channels_busy_or_mutex_contention, but the global mutex has been removed. This makes the error details misleading for operators. Consider renaming that reason value (and updating the corresponding tests) to reflect claim-lock contention / all-channels-busy without referencing a mutex.
| code: 'KV_ERROR', | ||
| status: HTTP_STATUS.INTERNAL_SERVER_ERROR, | ||
| details: { relayerId: id, key, message: error instanceof Error ? error.message : String(error) }, | ||
| details: { message: error instanceof Error ? error.message : String(error) }, |
There was a problem hiding this comment.
getLockedRelayerIds throws KV error while checking relayer locks but the error details no longer include the key/pattern being scanned. Including at least the prefix/pattern (e.g. ${prefix}*) in details would make operational debugging much easier when listKeys is unavailable or misconfigured.
| details: { message: error instanceof Error ? error.message : String(error) }, | |
| details: { | |
| prefix, | |
| pattern: `${prefix}*`, | |
| message: error instanceof Error ? error.message : String(error), | |
| }, |
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | ||
| } catch (err) { | ||
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | ||
| // This is expensive with many channels — log so persistent failures are observable. | ||
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | ||
| lruMap = {}; | ||
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | ||
| lockedSet = new Set(ids.filter((_, i) => results[i])); | ||
| } | ||
|
|
||
| const unlocked = ids.filter(id => !lockedSet.has(id)); |
There was a problem hiding this comment.
Prettier’s default arrowParens setting is always, so callbacks like lockedKeys.map(k => ...) will fail the repo’s prettier/prettier lint rule. Update these arrow functions (in this block and similar ones below) to include parentheses around single params, or run Prettier on the file.
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | |
| } catch (err) { | |
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | |
| // This is expensive with many channels — log so persistent failures are observable. | |
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | |
| lruMap = {}; | |
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | |
| lockedSet = new Set(ids.filter((_, i) => results[i])); | |
| } | |
| const unlocked = ids.filter(id => !lockedSet.has(id)); | |
| lockedSet = new Set(lockedKeys.map((k) => k.slice(lockPrefix.length))); | |
| } catch (err) { | |
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | |
| // This is expensive with many channels — log so persistent failures are observable. | |
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | |
| lruMap = {}; | |
| const results = await Promise.all(ids.map((id) => this.kv.exists(this.lockKey(id)))); | |
| lockedSet = new Set(ids.filter((_, i) => results[i])); | |
| } | |
| const unlocked = ids.filter((id) => !lockedSet.has(id)); |
| pool.acquire(defaultOptions), | ||
| ]); | ||
|
|
||
| const relayerIds = results.map(r => r.relayerId); |
There was a problem hiding this comment.
This arrow callback is missing parentheses around the single parameter (results.map(r => ...)). With the repo’s Prettier setup, this will be reformatted and can fail prettier/prettier lint; please run Prettier or update to (r) => ....
| const relayerIds = results.map(r => r.relayerId); | |
| const relayerIds = results.map((r) => r.relayerId); |
| const successes = results.filter(r => r.ok).map(r => (r as any).lock); | ||
| const failures = results.filter(r => !r.ok); |
There was a problem hiding this comment.
Several arrow callbacks here omit parentheses around single parameters (e.g., filter(r => ...), map(r => ...)). The repo enforces Prettier via ESLint, so these will cause formatting/lint failures unless updated (or the file is run through Prettier).
| const successes = results.filter(r => r.ok).map(r => (r as any).lock); | |
| const failures = results.filter(r => !r.ok); | |
| const successes = results.filter((r) => r.ok).map((r) => (r as any).lock); | |
| const failures = results.filter((r) => !r.ok); |
| let lruMap: Record<string, number>; | ||
| try { | ||
| const [lockedKeys, lruMapRaw] = await Promise.all([ | ||
| this.kv.listKeys(`${lockPrefix}*`), | ||
| this.kv.get<Record<string, number>>(this.lruMapKey()), | ||
| ]); | ||
| lruMap = lruMapRaw ?? {}; | ||
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | ||
| } catch (err) { | ||
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | ||
| // This is expensive with many channels — log so persistent failures are observable. | ||
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | ||
| lruMap = {}; | ||
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | ||
| lockedSet = new Set(ids.filter((_, i) => results[i])); | ||
| } | ||
|
|
There was a problem hiding this comment.
The Promise.all([listKeys, get(lru-map)]) is wrapped in a single try/catch, but the catch block assumes listKeys failed. If get(lru-map) throws (or only one side fails), you still fall back to per-channel exists() checks, drop the LRU map, and log a misleading "listKeys failed" warning. Consider handling listKeys and lruMap reads separately so you can (a) keep LRU ordering when only listKeys fails, (b) avoid the O(N) fallback when listKeys succeeds, and (c) log the correct failing operation.
| let lruMap: Record<string, number>; | |
| try { | |
| const [lockedKeys, lruMapRaw] = await Promise.all([ | |
| this.kv.listKeys(`${lockPrefix}*`), | |
| this.kv.get<Record<string, number>>(this.lruMapKey()), | |
| ]); | |
| lruMap = lruMapRaw ?? {}; | |
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | |
| } catch (err) { | |
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | |
| // This is expensive with many channels — log so persistent failures are observable. | |
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | |
| lruMap = {}; | |
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | |
| lockedSet = new Set(ids.filter((_, i) => results[i])); | |
| } | |
| let lruMap: Record<string, number> = {}; | |
| // Determine which channels are currently locked. | |
| let lockedKeys: string[] | null = null; | |
| try { | |
| lockedKeys = await this.kv.listKeys(`${lockPrefix}*`); | |
| } catch (err) { | |
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | |
| // This is expensive with many channels — log so persistent failures are observable. | |
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | |
| } | |
| if (lockedKeys === null) { | |
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | |
| lockedSet = new Set(ids.filter((_, i) => results[i])); | |
| } else { | |
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | |
| } | |
| // Load LRU map separately so that failures here do not affect lock detection. | |
| try { | |
| const lruMapRaw = await this.kv.get<Record<string, number>>(this.lruMapKey()); | |
| if (lruMapRaw) { | |
| lruMap = lruMapRaw; | |
| } | |
| } catch (err) { | |
| console.warn('[channels] failed to read LRU map; proceeding without LRU ordering', err); | |
| } |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
test/pool.test.ts (1)
130-141: The cooldown test doesn't verify the cooldown duration.An immediate
exists()check still passes ifreleaseWithCooldown()leaves the original lock TTL unchanged. Since the API's value is the shorter block window, please assert expiry after a 1s cooldown with fake timers or a short wait.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/pool.test.ts` around lines 130 - 141, The test for ChannelPool.releaseWithCooldown currently only checks immediate existence and doesn't verify the cooldown duration; update the test (involving ChannelPool, FakeKV, acquire, releaseWithCooldown and defaultOptions) to assert that the key expires after the short cooldown by either using fake timers (advance timers by ~1s + small buffer) or awaiting a short real delay and then calling kv.exists(`testnet:channel:in-use:${lock.relayerId}`) expecting false; ensure you still set the initial relayerIds and call acquire before releaseWithCooldown.test/handler.sequence-cache.test.ts (1)
266-276: Add a positive test for the cooldown-release branch.This only proves
releaseWithCooldown()is not used forONCHAIN_FAILED. The new branch insrc/plugin/handler.tsthat uses cooldown release for unexpected submit statuses still has no direct coverage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/handler.sequence-cache.test.ts` around lines 266 - 276, Add a positive unit test that exercises the branch which calls poolSpies.releaseWithCooldown when submit returns an unexpected status: in the test suite for handler, create a test that stubs/mockSubmit to resolve to a submit response object representing the "unexpected" status (rather than rejecting with ONCHAIN_FAILED), invoke handler(ctx) and assert that poolSpies.releaseWithCooldown was called with the expected arguments (and that poolSpies.release was not called), using the same context construction (makeContext(kv)) and existing spies (poolSpies.releaseWithCooldown, poolSpies.release) to target the new branch in src/plugin/handler.ts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/plugin/pool.ts`:
- Around line 133-140: The read-modify-write on the shared LRU map (calls to
this.kv.get(this.lruMapKey()) and this.kv.set(..., POOL.LRU_MAP_TTL_SECONDS))
can lose updates under concurrent workers; add a dedicated lock helper (e.g.
lruMapUpdateLockKey() returning `${this.network}:channel:lru-map:update`) and
acquire that lock around the freshLru read, modification (freshLru[relayerId] =
Date.now()), and set, using the existing lock acquire/release pattern
(try/finally) so updates to the shared LRU document are serialized while
preserving the same TTL and behavior.
- Around line 83-97: The current try/catch treats any error from
Promise.all([this.kv.listKeys(...), this.kv.get(this.lruMapKey())]) as a
listKeys failure and falls back to the expensive per-id this.kv.exists path;
instead, split the reads so you first await this.kv.listKeys(`${lockPrefix}*`)
and, if that succeeds, independently attempt this.kv.get(this.lruMapKey()) and
tolerate errors from the get by defaulting lruMap = {} (while still using the
lockedKeys result to build lockedSet); only when listKeys actually throws should
you run the expensive Promise.all(ids.map(id =>
this.kv.exists(this.lockKey(id)))). This change keeps successful lock scans
(lockedKeys → lockedSet) even if reading the LRU map fails.
In `@test/helpers/fakeKV.ts`:
- Around line 51-56: The finally block unconditionally deletes the key and can
remove a newer lock if the original lock expired; fix by writing a unique lock
identifier when acquiring the lock (replace the literal token with a generated
id), store that id (and/or the expiresAt) in the set call, and in the finally
handler read this.store.get(key) and only call this.store.delete(key) if the
current entry still matches the same lock id (and optionally the same expiresAt)
so you only remove the lock you own; update the lock acquisition site (where
this.store.set is called) and the cleanup in the finally accordingly.
In `@test/pool.busy.test.ts`:
- Around line 31-43: The test writes the in-use key before calling
ChannelPool.acquire so it never exercises the "between scan and claim" race; to
fix it, arrange for the in-use key for 'p1' to be created after the pool's
initial scan but before its claim step (simulate the race). Modify the test to
either (a) instrument the FakeKV used by the pool so that FakeKV.get/list/scan
will only return no in-use for 'p1' on the first scan and then insert the key
(via FakeKV.set) when the pool next attempts to claim, or (b) hook into
ChannelPool.acquire by wrapping/monkey-patching FakeKV methods to schedule a
setImmediate/setTimeout that writes 'testnet:channel:in-use:p1' after acquire
has started scanning and before it performs the claim; reference FakeKV
(get/set/scan) and ChannelPool.acquire/claim to locate where to inject the
delayed set. Ensure the test still expects p2 to be acquired and released.
---
Nitpick comments:
In `@test/handler.sequence-cache.test.ts`:
- Around line 266-276: Add a positive unit test that exercises the branch which
calls poolSpies.releaseWithCooldown when submit returns an unexpected status: in
the test suite for handler, create a test that stubs/mockSubmit to resolve to a
submit response object representing the "unexpected" status (rather than
rejecting with ONCHAIN_FAILED), invoke handler(ctx) and assert that
poolSpies.releaseWithCooldown was called with the expected arguments (and that
poolSpies.release was not called), using the same context construction
(makeContext(kv)) and existing spies (poolSpies.releaseWithCooldown,
poolSpies.release) to target the new branch in src/plugin/handler.ts.
In `@test/pool.test.ts`:
- Around line 130-141: The test for ChannelPool.releaseWithCooldown currently
only checks immediate existence and doesn't verify the cooldown duration; update
the test (involving ChannelPool, FakeKV, acquire, releaseWithCooldown and
defaultOptions) to assert that the key expires after the short cooldown by
either using fake timers (advance timers by ~1s + small buffer) or awaiting a
short real delay and then calling
kv.exists(`testnet:channel:in-use:${lock.relayerId}`) expecting false; ensure
you still set the initial relayerIds and call acquire before
releaseWithCooldown.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2bce8515-89c9-419f-bb13-81ff01f25832
📒 Files selected for processing (9)
src/plugin/constants.tssrc/plugin/handler.tssrc/plugin/management.tssrc/plugin/pool.tstest/handler.sequence-cache.test.tstest/helpers/fakeKV.tstest/management.test.tstest/pool.busy.test.tstest/pool.test.ts
| try { | ||
| const [lockedKeys, lruMapRaw] = await Promise.all([ | ||
| this.kv.listKeys(`${lockPrefix}*`), | ||
| this.kv.get<Record<string, number>>(this.lruMapKey()), | ||
| ]); | ||
| lruMap = lruMapRaw ?? {}; | ||
| lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length))); | ||
| } catch (err) { | ||
| // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks. | ||
| // This is expensive with many channels — log so persistent failures are observable. | ||
| console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err); | ||
| lruMap = {}; | ||
| const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))); | ||
| lockedSet = new Set(ids.filter((_, i) => results[i])); | ||
| } |
There was a problem hiding this comment.
Split the scan-state reads before falling back to exists().
Right now a transient lruMapKey() read failure takes the same path as a listKeys() failure, so we throw away a good lock scan and pay the O(N) fallback anyway. That turns an ordering-only problem into a full acquire-path regression.
🛠️ Proposed fix
- let lockedSet: Set<string>;
- let lruMap: Record<string, number>;
- try {
- const [lockedKeys, lruMapRaw] = await Promise.all([
- this.kv.listKeys(`${lockPrefix}*`),
- this.kv.get<Record<string, number>>(this.lruMapKey()),
- ]);
- lruMap = lruMapRaw ?? {};
- lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length)));
- } catch (err) {
- // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks.
- // This is expensive with many channels — log so persistent failures are observable.
- console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err);
- lruMap = {};
- const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id))));
- lockedSet = new Set(ids.filter((_, i) => results[i]));
- }
+ let lruMap: Record<string, number> = {};
+ try {
+ lruMap = (await this.kv.get<Record<string, number>>(this.lruMapKey())) ?? {};
+ } catch (err) {
+ console.warn('[channels] LRU map read failed, using empty ordering map', err);
+ }
+
+ let lockedSet = new Set<string>();
+ try {
+ const lockedKeys = await this.kv.listKeys(`${lockPrefix}*`);
+ lockedSet = new Set(lockedKeys.map(k => k.slice(lockPrefix.length)));
+ } catch (err) {
+ // Fallback: if listKeys fails, degrade to O(N) per-channel exists checks.
+ // This is expensive with many channels — log so persistent failures are observable.
+ console.warn('[channels] listKeys failed, falling back to per-channel exists checks', err);
+ const results = await Promise.all(ids.map(id => this.kv.exists(this.lockKey(id))));
+ lockedSet = new Set(ids.filter((_, i) => results[i]));
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/plugin/pool.ts` around lines 83 - 97, The current try/catch treats any
error from Promise.all([this.kv.listKeys(...), this.kv.get(this.lruMapKey())])
as a listKeys failure and falls back to the expensive per-id this.kv.exists
path; instead, split the reads so you first await
this.kv.listKeys(`${lockPrefix}*`) and, if that succeeds, independently attempt
this.kv.get(this.lruMapKey()) and tolerate errors from the get by defaulting
lruMap = {} (while still using the lockedKeys result to build lockedSet); only
when listKeys actually throws should you run the expensive
Promise.all(ids.map(id => this.kv.exists(this.lockKey(id)))). This change keeps
successful lock scans (lockedKeys → lockedSet) even if reading the LRU map
fails.
| // Best-effort LRU update — re-fetch the map to minimize last-writer-wins | ||
| // staleness (the read-phase copy may be many retries old). Concurrent workers | ||
| // claiming different channels can still race on get→set, but the window is | ||
| // narrow. Only affects ordering precision, not claim correctness. | ||
| try { | ||
| const freshLru = await this.kv.get<Record<string, number>>(this.lruMapKey()) ?? {}; | ||
| freshLru[relayerId] = Date.now(); | ||
| await this.kv.set(this.lruMapKey(), freshLru, { ttlSec: POOL.LRU_MAP_TTL_SECONDS }); |
There was a problem hiding this comment.
Guard the shared LRU document against lost updates.
This is a read-modify-write on one KV document under only a per-channel lock. Two workers claiming different channels can overwrite each other here, so the map stops reflecting real recency under concurrency.
🛠️ Proposed fix
try {
- const freshLru = await this.kv.get<Record<string, number>>(this.lruMapKey()) ?? {};
- freshLru[relayerId] = Date.now();
- await this.kv.set(this.lruMapKey(), freshLru, { ttlSec: POOL.LRU_MAP_TTL_SECONDS });
+ await this.kv.withLock(
+ this.lruMapUpdateLockKey(),
+ async () => {
+ const freshLru = (await this.kv.get<Record<string, number>>(this.lruMapKey())) ?? {};
+ freshLru[relayerId] = Date.now();
+ await this.kv.set(this.lruMapKey(), freshLru, { ttlSec: POOL.LRU_MAP_TTL_SECONDS });
+ },
+ { ttlSec: POOL.CLAIM_LOCK_TTL_SECONDS },
+ );
} catch { /* ordering-only */ }Outside this range, add a dedicated lock key helper for the shared document:
private lruMapUpdateLockKey(): string {
return `${this.network}:channel:lru-map:update`;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/plugin/pool.ts` around lines 133 - 140, The read-modify-write on the
shared LRU map (calls to this.kv.get(this.lruMapKey()) and this.kv.set(...,
POOL.LRU_MAP_TTL_SECONDS)) can lose updates under concurrent workers; add a
dedicated lock helper (e.g. lruMapUpdateLockKey() returning
`${this.network}:channel:lru-map:update`) and acquire that lock around the
freshLru read, modification (freshLru[relayerId] = Date.now()), and set, using
the existing lock acquire/release pattern (try/finally) so updates to the shared
LRU document are serialized while preserving the same TTL and behavior.
| const ttl = opts?.ttlSec && opts.ttlSec > 0 ? opts.ttlSec : 1; | ||
| await this.set(key, { token: 'lock' }, { ttlSec: ttl }); | ||
| this.store.set(key, { value: { token: 'lock' }, expiresAt: now + ttl * 1000 }); | ||
| try { | ||
| return await fn(); | ||
| } finally { | ||
| await this.del(key); | ||
| this.store.delete(key); |
There was a problem hiding this comment.
Guard finally with lock ownership.
If fn() outlives ttlSec, a second caller can reacquire key after expiry. The first caller then unconditionally deletes that newer lock in finally, which breaks the exclusivity this helper is supposed to simulate.
Possible fix
- const ttl = opts?.ttlSec && opts.ttlSec > 0 ? opts.ttlSec : 1;
- this.store.set(key, { value: { token: 'lock' }, expiresAt: now + ttl * 1000 });
+ const ttl = opts?.ttlSec && opts.ttlSec > 0 ? opts.ttlSec : 1;
+ const token = `${key}:${now}:${Math.random()}`;
+ this.store.set(key, { value: { token }, expiresAt: now + ttl * 1000 });
try {
return await fn();
} finally {
- this.store.delete(key);
+ const current = this.store.get(key);
+ if (current?.value?.token === token) {
+ this.store.delete(key);
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const ttl = opts?.ttlSec && opts.ttlSec > 0 ? opts.ttlSec : 1; | |
| await this.set(key, { token: 'lock' }, { ttlSec: ttl }); | |
| this.store.set(key, { value: { token: 'lock' }, expiresAt: now + ttl * 1000 }); | |
| try { | |
| return await fn(); | |
| } finally { | |
| await this.del(key); | |
| this.store.delete(key); | |
| const ttl = opts?.ttlSec && opts.ttlSec > 0 ? opts.ttlSec : 1; | |
| const token = `${key}:${now}:${Math.random()}`; | |
| this.store.set(key, { value: { token }, expiresAt: now + ttl * 1000 }); | |
| try { | |
| return await fn(); | |
| } finally { | |
| const current = this.store.get(key); | |
| if (current?.value?.token === token) { | |
| this.store.delete(key); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test/helpers/fakeKV.ts` around lines 51 - 56, The finally block
unconditionally deletes the key and can remove a newer lock if the original lock
expired; fix by writing a unique lock identifier when acquiring the lock
(replace the literal token with a generated id), store that id (and/or the
expiresAt) in the set call, and in the finally handler read this.store.get(key)
and only call this.store.delete(key) if the current entry still matches the same
lock id (and optionally the same expiresAt) so you only remove the lock you own;
update the lock acquisition site (where this.store.set is called) and the
cleanup in the finally accordingly.
| test('acquire retries when channel in-use key appears between scan and claim', async () => { | ||
| const kv = new FakeKV(); | ||
| const pool = new ChannelPool('testnet', kv as any, 30); | ||
|
|
||
| await kv.set('testnet:channel:relayer-ids', { relayerIds: ['p1', 'p2'] }); | ||
|
|
||
| // p1 is locked (in-use key exists) | ||
| await kv.set('testnet:channel:in-use:p1', { token: 'other', lockedAt: new Date().toISOString() }, { ttlSec: 30 }); | ||
|
|
||
| // p2 is free — should be acquired | ||
| const lock = await pool.acquire(defaultOptions); | ||
| expect(lock.relayerId).toBe('p2'); | ||
| await pool.release(lock); |
There was a problem hiding this comment.
This setup never reaches the “between scan and claim” path.
testnet:channel:in-use:p1 is written before pool.acquire(), so p1 is excluded during the initial read phase. The claim-phase double-check in src/plugin/pool.ts is never exercised by this test.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test/pool.busy.test.ts` around lines 31 - 43, The test writes the in-use key
before calling ChannelPool.acquire so it never exercises the "between scan and
claim" race; to fix it, arrange for the in-use key for 'p1' to be created after
the pool's initial scan but before its claim step (simulate the race). Modify
the test to either (a) instrument the FakeKV used by the pool so that
FakeKV.get/list/scan will only return no in-use for 'p1' on the first scan and
then insert the key (via FakeKV.set) when the pool next attempts to claim, or
(b) hook into ChannelPool.acquire by wrapping/monkey-patching FakeKV methods to
schedule a setImmediate/setTimeout that writes 'testnet:channel:in-use:p1' after
acquire has started scanning and before it performs the claim; reference FakeKV
(get/set/scan) and ChannelPool.acquire/claim to locate where to inject the
delayed set. Ensure the test still expects p2 to be acquired and released.
LRU channel acquisition with per-channel claim locks
Replace the global mutex in channel pool acquire with per-channel claim locks and LRU-based channel selection. The global mutex serialized all acquire attempts across all workers — a bottleneck at scale (~2k channels). The new design uses a two-phase optimistic pattern:
Read phase (lock-free): parallel listKeys + single LRU map read to determine which channels are available and their usage order
Claim phase (per-channel lock): withLock on a per-channel claim key with double-check inside, falling through to the next candidate if busy
This means workers claiming different channels no longer block each other.
Other changes:
listKeys fallback to per-channel exists checks if listKeys fails
Summary by CodeRabbit
New Features
Improvements