Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions src/polling/pollScraperResults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@ export type PollResult = ScrapeRun & {
data?: unknown[];
};

const MAX_POLL_FAILURES = 5;

/**
* Builds a PollResult marking a run as failed.
*/
function failedResult(run: ScrapeRun): PollResult {
return { runId: run.runId, datasetId: run.datasetId, status: "FAILED" };
}

/**
* Polls each scraper run in parallel until all are completed (SUCCEEDED or FAILED).
* Marks a run as FAILED after MAX_POLL_FAILURES consecutive poll errors.
* Returns an array of results for each run.
*/
export async function pollScraperResults(
Expand All @@ -23,17 +33,34 @@ export async function pollScraperResults(
const pendingRuns = new Map<string, ScrapeRun>(
runs.map((run) => [run.runId, run])
);
const failureCounts = new Map<string, number>();

while (pendingRuns.size > 0) {
// Poll all pending runs in parallel
const pollPromises = Array.from(pendingRuns.values()).map(async (run) => {
const result = await getScraperResults(run.runId);

if (!result) {
logger.warn("Failed to get scraper result", { runId: run.runId });
const failures = (failureCounts.get(run.runId) ?? 0) + 1;
failureCounts.set(run.runId, failures);
logger.warn("Failed to get scraper result", {
runId: run.runId,
consecutiveFailures: failures,
});

if (failures >= MAX_POLL_FAILURES) {
logger.error("Max poll failures reached, marking run as FAILED", {
runId: run.runId,
});
return { run, pollResult: failedResult(run) };
}

return null;
}

// Reset failure count on successful poll
failureCounts.delete(run.runId);

if (result.status === "SUCCEEDED") {
const completedResult = result as {
status: string;
Expand All @@ -47,17 +74,10 @@ export async function pollScraperResults(
datasetId: completedResult.datasetId,
status: completedResult.status,
data: completedResult.data,
},
} as PollResult,
};
} else if (result.status === "FAILED") {
return {
run,
pollResult: {
runId: run.runId,
datasetId: result.datasetId,
status: result.status,
},
};
return { run, pollResult: failedResult(run) };
}

return null; // Still running
Expand Down