From a7acc2dadc8fff679f00fa84d6b05d7f3da8b0fa Mon Sep 17 00:00:00 2001 From: Emma Jamieson-Hoare Date: Wed, 6 May 2026 11:33:58 +0100 Subject: [PATCH] fix: reduce snapshots viewer cold cache work --- apps/tempo-snapshots-viewer/src/index.ts | 296 +++++++++++++---------- 1 file changed, 168 insertions(+), 128 deletions(-) diff --git a/apps/tempo-snapshots-viewer/src/index.ts b/apps/tempo-snapshots-viewer/src/index.ts index 170d9b727..85bee010e 100644 --- a/apps/tempo-snapshots-viewer/src/index.ts +++ b/apps/tempo-snapshots-viewer/src/index.ts @@ -63,6 +63,7 @@ interface Snapshot { components?: SnapshotComponent[] manifestUrl?: string manifestKey?: string + presetSizes?: PresetSizes rawManifest?: SnapshotManifest } @@ -408,24 +409,6 @@ async function serveSnapshot( }) } -async function listAllObjects( - bucket: R2Bucket, - prefix?: string, -): Promise { - const objects: R2Object[] = [] - let cursor: string | undefined - while (true) { - const res = await withR2Retry( - `listing objects for prefix ${prefix || ''}`, - () => bucket.list({ cursor, prefix }), - ) - objects.push(...res.objects) - if (!res.truncated) break - cursor = res.cursor - } - return objects -} - // List top-level directories and root objects using R2 delimiter (paginated) async function listRoot( bucket: R2Bucket, @@ -447,26 +430,65 @@ async function listRoot( return { dirs, objects } } -// Check if all files referenced by a manifest exist in R2 -function isManifestComplete( - manifest: SnapshotManifest, - dirPrefix: string, - keySet: Set, -): boolean { - for (const [name, comp] of Object.entries(manifest.components)) { - if (isSingleArchive(comp)) { - if (!keySet.has(`${dirPrefix}/${comp.file}`)) return false - } else { - const numChunks = Math.ceil(comp.total_blocks / comp.blocks_per_file) - for (let i = 0; i < numChunks; i++) { - const start = i * comp.blocks_per_file - const end = (i + 1) * comp.blocks_per_file - 1 - const chunkKey = `${dirPrefix}/${name}-${start}-${end}.tar.zst` - if (!keySet.has(chunkKey)) return false - } +const SNAPSHOT_FETCH_CONCURRENCY = 8 + +async function mapWithConcurrency( + items: T[], + limit: number, + mapper: (item: T) => Promise, +): Promise { + const results = new Array(items.length) + const entries = items.map((item, index) => ({ index, item })) + let nextIndex = 0 + + async function worker(): Promise { + while (nextIndex < entries.length) { + const entry = entries[nextIndex] + nextIndex += 1 + if (!entry) break + results[entry.index] = await mapper(entry.item) } } - return true + + const workers = Array.from({ length: Math.min(limit, items.length) }, () => + worker(), + ) + await Promise.all(workers) + + return results +} + +function getComponentTerminalObjectKey( + dirPrefix: string, + name: string, + comp: ComponentManifest, +): string { + if (isSingleArchive(comp)) { + return `${dirPrefix}/${comp.file}` + } + + const numChunks = Math.ceil(comp.total_blocks / comp.blocks_per_file) + const lastChunkIndex = Math.max(0, numChunks - 1) + const start = lastChunkIndex * comp.blocks_per_file + const end = (lastChunkIndex + 1) * comp.blocks_per_file - 1 + return `${dirPrefix}/${name}-${start}-${end}.tar.zst` +} + +async function isManifestLikelyComplete( + bucket: R2Bucket, + manifest: SnapshotManifest, + dirPrefix: string, +): Promise { + const terminalKeys = Object.entries(manifest.components).map(([name, comp]) => + getComponentTerminalObjectKey(dirPrefix, name, comp), + ) + const heads = await Promise.all( + terminalKeys.map((key) => + withR2Retry(`checking manifest component ${key}`, () => bucket.head(key)), + ), + ) + + return heads.every(Boolean) } // Fetch and parse all snapshots from R2 @@ -480,78 +502,83 @@ async function getSnapshots(env: Env): Promise { obj.key.endsWith('.json'), ) - // Fetch manifests in parallel — only need to get manifest.json from each dir - const manifestPromises = dirs.map(async (dir): Promise => { - const dirName = dir.replace(/\/$/, '') - const manifestKey = `${dirName}/manifest.json` + // Fetch manifests in bounded parallelism. Keep this request path compact: + // listing every chunk in every snapshot can exceed Worker CPU limits. + const manifestResults = await mapWithConcurrency( + dirs, + SNAPSHOT_FETCH_CONCURRENCY, + async (dir): Promise => { + const dirName = dir.replace(/\/$/, '') + const manifestKey = `${dirName}/manifest.json` - try { - const obj = await withR2Retry(`fetching manifest ${manifestKey}`, () => - env.SNAPSHOTS.get(manifestKey), - ) - if (!obj) return null - - const manifest: SnapshotManifest = await obj.json() + try { + const obj = await withR2Retry(`fetching manifest ${manifestKey}`, () => + env.SNAPSHOTS.get(manifestKey), + ) + if (!obj) return null - // List objects in this directory to verify completeness - const dirObjects = await listAllObjects(env.SNAPSHOTS, dir) - const keySet = new Set(dirObjects.map((o) => o.key)) + const manifest: SnapshotManifest = await obj.json() - if (!isManifestComplete(manifest, dirName, keySet)) { - console.warn(`Skipping incomplete snapshot: ${manifestKey}`) - return null - } + if ( + !(await isManifestLikelyComplete(env.SNAPSHOTS, manifest, dirName)) + ) { + console.warn(`Skipping incomplete snapshot: ${manifestKey}`) + return null + } - const chainId = String(manifest.chain_id) - const network = getNetworkInfo(chainId) - const baseUrl = `${env.R2_PUBLIC_URL}/${dirName}` + const chainId = String(manifest.chain_id) + const network = getNetworkInfo(chainId) + const baseUrl = `${env.R2_PUBLIC_URL}/${dirName}` - const date = new Date(manifest.timestamp * 1000) - .toISOString() - .split('T')[0] + const date = new Date(manifest.timestamp * 1000) + .toISOString() + .split('T')[0] - const components: SnapshotComponent[] = [] - let totalSize = 0 + const components: SnapshotComponent[] = [] + let totalSize = 0 - for (const [name, comp] of Object.entries(manifest.components)) { - const displayName = COMPONENT_DISPLAY_NAMES[name] || name - const size = getComponentSize(comp) - components.push({ name, displayName, size }) - totalSize += size - } + for (const [name, comp] of Object.entries(manifest.components)) { + const displayName = COMPONENT_DISPLAY_NAMES[name] || name + const size = getComponentSize(comp) + components.push({ name, displayName, size }) + totalSize += size + } - const manifestUrl = `${baseUrl}/manifest.json` - return { - snapshotId: manifestUrl, - chainId, - networkKey: network.key, - networkName: network.name, - block: manifest.block, - timestamp: String(manifest.timestamp), - date, - image: - manifest.tempo_version || - manifest.reth_version || - manifest.image || - 'unknown', - archiveUrl: manifestUrl, - archiveFile: manifestKey, - metadataUrl: `${env.R2_PUBLIC_URL}/${manifestKey}`, - size: totalSize, - isModular: true, - components, - manifestUrl, - manifestKey, - rawManifest: manifest, + const manifestUrl = `${baseUrl}/manifest.json` + return { + snapshotId: manifestUrl, + chainId, + networkKey: network.key, + networkName: network.name, + block: manifest.block, + timestamp: String(manifest.timestamp), + date, + image: + manifest.tempo_version || + manifest.reth_version || + manifest.image || + 'unknown', + archiveUrl: manifestUrl, + archiveFile: manifestKey, + metadataUrl: `${env.R2_PUBLIC_URL}/${manifestKey}`, + size: totalSize, + isModular: true, + components, + manifestUrl, + manifestKey, + presetSizes: getPresetSizesFromManifest(manifest), + } + } catch (err) { + console.error(`Failed to parse manifest ${manifestKey}:`, err) + return null } - } catch (err) { - console.error(`Failed to parse manifest ${manifestKey}:`, err) - return null - } - }) + }, + ) - // Fetch legacy metadata in parallel - const legacyPromises = legacyMetadataFiles.map( + // Fetch legacy metadata in bounded parallelism. + const legacyResults = await mapWithConcurrency( + legacyMetadataFiles, + SNAPSHOT_FETCH_CONCURRENCY, async (file): Promise => { try { const obj = await withR2Retry( @@ -605,11 +632,6 @@ async function getSnapshots(env: Env): Promise { }, ) - const [manifestResults, legacyResults] = await Promise.all([ - Promise.all(manifestPromises), - Promise.all(legacyPromises), - ]) - const snapshots = [ ...manifestResults.filter((s): s is Snapshot => s !== null), ...legacyResults.filter((s): s is Snapshot => s !== null), @@ -622,13 +644,15 @@ async function getSnapshots(env: Env): Promise { return snapshots } -const CACHE_KEY_FULL = 'https://snapshots.tempoxyz.dev/cache/full' -const CACHE_KEY_API = 'https://snapshots.tempoxyz.dev/cache/api' +const CACHE_KEY_FULL = 'https://snapshots.tempoxyz.dev/cache/v2/full' +const CACHE_KEY_API = 'https://snapshots.tempoxyz.dev/cache/v2/api' +const CACHE_KEY_UI_HTML = 'https://snapshots.tempoxyz.dev/cache/v2/ui-html' const CACHE_TTL = 3600 // 1 hour — snapshots change at most once per day +let snapshotRefreshPromise: Promise | undefined -// Strip rawManifest (contains huge chunk_sizes arrays) from snapshots for API -function stripRawManifests(snapshots: Snapshot[]): Snapshot[] { - return snapshots.map(({ rawManifest, ...rest }) => rest) +// Strip UI-only fields from snapshots for API responses. +function stripSnapshotInternals(snapshots: Snapshot[]): Snapshot[] { + return snapshots.map(({ presetSizes, rawManifest, ...rest }) => rest) } // Populate both caches from a fresh snapshot list @@ -642,19 +666,22 @@ async function populateSnapshotCaches( 'Cache-Control': `public, max-age=${CACHE_TTL}`, }, }) - const stripped = new Response(JSON.stringify(stripRawManifests(snapshots)), { - headers: { - 'Content-Type': 'application/json', - 'Cache-Control': `public, max-age=${CACHE_TTL}`, + const stripped = new Response( + JSON.stringify(stripSnapshotInternals(snapshots)), + { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': `public, max-age=${CACHE_TTL}`, + }, }, - }) + ) await Promise.all([ cache.put(CACHE_KEY_FULL, full), cache.put(CACHE_KEY_API, stripped), ]) } -// Canonical source: always returns full snapshots with rawManifest +// Canonical source: returns UI snapshots with precomputed preset sizes. async function getFullSnapshots(env: Env): Promise { const cache = caches.default const cached = await cache.match(CACHE_KEY_FULL) @@ -662,9 +689,16 @@ async function getFullSnapshots(env: Env): Promise { return cached.json() } - const snapshots = await getSnapshots(env) - await populateSnapshotCaches(cache, snapshots) - return snapshots + snapshotRefreshPromise ??= getSnapshots(env) + .then(async (snapshots) => { + await populateSnapshotCaches(cache, snapshots) + return snapshots + }) + .finally(() => { + snapshotRefreshPromise = undefined + }) + + return snapshotRefreshPromise } // For UI rendering — alias for getFullSnapshots @@ -672,7 +706,7 @@ async function getCachedSnapshotsForUI(env: Env): Promise { return getFullSnapshots(env) } -// Handle API requests — returns stripped snapshots (no rawManifest/chunk_sizes) +// Handle API requests — returns stripped snapshots without UI-only sizing data. async function handleAPI(_req: Request, env: Env): Promise { const cache = caches.default const cached = await cache.match(CACHE_KEY_API) @@ -687,7 +721,7 @@ async function handleAPI(_req: Request, env: Env): Promise { // Populates both caches, return stripped const snapshots = await getFullSnapshots(env) - const stripped = stripRawManifests(snapshots) + const stripped = stripSnapshotInternals(snapshots) return new Response(JSON.stringify(stripped), { headers: { 'Content-Type': 'application/json', @@ -700,9 +734,7 @@ async function handleAPI(_req: Request, env: Env): Promise { async function handleUI(_req: Request, env: Env) { // Serve cached HTML from CF edge cache to avoid re-rendering const cache = caches.default - const cacheKey = new Request('https://snapshots.tempoxyz.dev/ui-html', { - method: 'GET', - }) + const cacheKey = new Request(CACHE_KEY_UI_HTML, { method: 'GET' }) const cachedHtml = await cache.match(cacheKey) if (cachedHtml) { return cachedHtml @@ -750,7 +782,16 @@ async function handleUI(_req: Request, env: Env) { snapshots.filter((snapshot) => snapshot.chainId === chainId), ]), ) as Record - const modularSnapshots = snapshots.filter((s) => s.isModular && s.rawManifest) + const getSnapshotPresetSizes = ( + snapshot: Snapshot, + ): PresetSizes | undefined => + snapshot.presetSizes || + (snapshot.rawManifest + ? getPresetSizesFromManifest(snapshot.rawManifest) + : undefined) + const modularSnapshots = snapshots.filter( + (s) => s.isModular && getSnapshotPresetSizes(s), + ) const modularSnapshotsByChain = Object.fromEntries( chainIds.map((chainId) => [ chainId, @@ -759,9 +800,8 @@ async function handleUI(_req: Request, env: Env) { ) as Record const snapshotPresetSizes: Record = {} for (const s of modularSnapshots) { - snapshotPresetSizes[s.snapshotId] = s.rawManifest - ? getPresetSizesFromManifest(s.rawManifest) - : defaultPresetSizes + snapshotPresetSizes[s.snapshotId] = + getSnapshotPresetSizes(s) || defaultPresetSizes } const selectedChainId = chainIds.includes(DEFAULT_CHAIN_ID) ? DEFAULT_CHAIN_ID @@ -769,8 +809,8 @@ async function handleUI(_req: Request, env: Env) { const selectedNetworkSnapshots = modularSnapshotsByChain[selectedChainId] || [] const latestModular = selectedNetworkSnapshots[0] - const presetSizes = latestModular?.rawManifest - ? getPresetSizesFromManifest(latestModular.rawManifest) + const presetSizes = latestModular + ? snapshotPresetSizes[latestModular.snapshotId] || defaultPresetSizes : defaultPresetSizes const networkOptions = chainIds.map((chainId) => { const network = getNetworkInfo(chainId)