From bd7f5c5239860bc8ec54336276fe787e6d29e4d1 Mon Sep 17 00:00:00 2001 From: DevSolex Date: Fri, 27 Mar 2026 08:41:06 +0100 Subject: [PATCH] feat: add parallel asset price fetching with configurable concurrency (#45) - Refactor fetchPrices() to use Promise.allSettled with batched concurrency - Add optional concurrencyLimit to ProviderConfig (defaults to 5) - Failed fetches are isolated and logged without blocking successful ones - Rate limiting is preserved across parallel batches - Add parallel-fetching.test.ts with 5 tests covering all acceptance criteria --- oracle/src/providers/base-provider.ts | 26 ++++-- oracle/src/types/index.ts | 1 + oracle/tests/parallel-fetching.test.ts | 122 +++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 9 deletions(-) create mode 100644 oracle/tests/parallel-fetching.test.ts diff --git a/oracle/src/providers/base-provider.ts b/oracle/src/providers/base-provider.ts index cc87cea0..46fec3d1 100644 --- a/oracle/src/providers/base-provider.ts +++ b/oracle/src/providers/base-provider.ts @@ -68,19 +68,27 @@ export abstract class BasePriceProvider { abstract fetchPrice(asset: string): Promise; /** - * Fetch prices for multiple assets - * Can be overridden for batch API calls + * Fetch prices for multiple assets in parallel with a concurrency limit. + * Failed fetches are logged and skipped without blocking successful ones. */ async fetchPrices(assets: string[]): Promise { + const concurrency = this.config.concurrencyLimit ?? 5; const results: RawPriceData[] = []; - for (const asset of assets) { - try { - await this.enforceRateLimit(); - const price = await this.fetchPrice(asset); - results.push(price); - } catch (error) { - logger.error(`Failed to fetch ${asset} from ${this.name}`, { error }); + for (let i = 0; i < assets.length; i += concurrency) { + const batch = assets.slice(i, i + concurrency); + const settled = await Promise.allSettled( + batch.map(async (asset) => { + await this.enforceRateLimit(); + return this.fetchPrice(asset); + }) + ); + for (const outcome of settled) { + if (outcome.status === 'fulfilled') { + results.push(outcome.value); + } else { + logger.error(`Failed to fetch price from ${this.name}`, { error: outcome.reason }); + } } } diff --git a/oracle/src/types/index.ts b/oracle/src/types/index.ts index 210f5be5..b50e0169 100644 --- a/oracle/src/types/index.ts +++ b/oracle/src/types/index.ts @@ -81,6 +81,7 @@ export interface ProviderConfig { maxRequests: number; windowMs: number; }; + concurrencyLimit?: number; } /** diff --git a/oracle/tests/parallel-fetching.test.ts b/oracle/tests/parallel-fetching.test.ts new file mode 100644 index 00000000..bdf9790b --- /dev/null +++ b/oracle/tests/parallel-fetching.test.ts @@ -0,0 +1,122 @@ +/** + * Tests for parallel asset price fetching in BasePriceProvider.fetchPrices() + */ + +import { describe, it, expect, vi } from 'vitest'; +import { BasePriceProvider } from '../src/providers/base-provider.js'; +import type { RawPriceData } from '../src/types/index.js'; + +function makeProvider(concurrencyLimit: number, maxRequests = 100, windowMs = 1000) { + return new (class extends BasePriceProvider { + public callLog: { asset: string; startedAt: number }[] = []; + + async fetchPrice(asset: string): Promise { + this.callLog.push({ asset, startedAt: Date.now() }); + return { asset, price: 1, timestamp: Math.floor(Date.now() / 1000), source: this.name }; + } + })({ + name: 'parallel-test', + enabled: true, + priority: 1, + weight: 1, + baseUrl: 'https://mock.api', + rateLimit: { maxRequests, windowMs }, + concurrencyLimit, + }); +} + +describe('fetchPrices parallel execution', () => { + it('returns results for all assets', async () => { + const provider = makeProvider(5); + const assets = ['XLM', 'BTC', 'ETH', 'USDC', 'USDT']; + const results = await provider.fetchPrices(assets); + expect(results).toHaveLength(5); + expect(results.map((r) => r.asset)).toEqual(expect.arrayContaining(assets)); + }); + + it('fetches up to concurrencyLimit assets in parallel per batch', async () => { + vi.useFakeTimers(); + const concurrency = 3; + const provider = new (class extends BasePriceProvider { + public inFlight = 0; + public maxObservedInFlight = 0; + + async fetchPrice(asset: string): Promise { + this.inFlight++; + this.maxObservedInFlight = Math.max(this.maxObservedInFlight, this.inFlight); + await new Promise((r) => setTimeout(r, 10)); + this.inFlight--; + return { asset, price: 1, timestamp: 0, source: this.name }; + } + })({ + name: 'concurrency-test', + enabled: true, + priority: 1, + weight: 1, + baseUrl: 'https://mock.api', + rateLimit: { maxRequests: 100, windowMs: 1000 }, + concurrencyLimit: concurrency, + }); + + const promise = provider.fetchPrices(['A', 'B', 'C', 'D', 'E', 'F']); + await vi.runAllTimersAsync(); + const results = await promise; + + expect(results).toHaveLength(6); + expect(provider.maxObservedInFlight).toBeLessThanOrEqual(concurrency); + vi.useRealTimers(); + }); + + it('does not block on failed fetches — successful ones still return', async () => { + const provider = new (class extends BasePriceProvider { + async fetchPrice(asset: string): Promise { + if (asset === 'FAIL') throw new Error('provider error'); + return { asset, price: 1, timestamp: 0, source: this.name }; + } + })({ + name: 'failure-test', + enabled: true, + priority: 1, + weight: 1, + baseUrl: 'https://mock.api', + rateLimit: { maxRequests: 100, windowMs: 1000 }, + concurrencyLimit: 5, + }); + + const results = await provider.fetchPrices(['XLM', 'FAIL', 'BTC']); + expect(results).toHaveLength(2); + expect(results.map((r) => r.asset)).toEqual(expect.arrayContaining(['XLM', 'BTC'])); + }); + + it('defaults to concurrency of 5 when concurrencyLimit is not set', async () => { + const provider = new (class extends BasePriceProvider { + async fetchPrice(asset: string): Promise { + return { asset, price: 1, timestamp: 0, source: this.name }; + } + })({ + name: 'default-concurrency-test', + enabled: true, + priority: 1, + weight: 1, + baseUrl: 'https://mock.api', + rateLimit: { maxRequests: 100, windowMs: 1000 }, + // concurrencyLimit intentionally omitted + }); + + const results = await provider.fetchPrices(['A', 'B', 'C', 'D', 'E', 'F']); + expect(results).toHaveLength(6); + }); + + it('respects rate limiting during parallel fetches', async () => { + const maxRequests = 3; + const windowMs = 100; + const provider = makeProvider(5, maxRequests, windowMs); + + const start = Date.now(); + await provider.fetchPrices(['A', 'B', 'C', 'D', 'E', 'F']); + const elapsed = Date.now() - start; + + // 6 requests with max 3 per 100ms window must span at least one window + expect(elapsed).toBeGreaterThanOrEqual(windowMs); + }); +});