Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions src/app/api/cron/backfill/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
import { NextResponse } from "next/server";
import { decodeEventLog, type Log } from "viem";
import { publicClient } from "../../../../../lib/viem";
import { createServerClient } from "../../../../../lib/supabase";
import {
storyFactoryAbi,
plotChainedEvent,

Check warning on line 7 in src/app/api/cron/backfill/route.ts

View workflow job for this annotation

GitHub Actions / lint-and-typecheck

'plotChainedEvent' is defined but never used
storylineCreatedEvent,

Check warning on line 8 in src/app/api/cron/backfill/route.ts

View workflow job for this annotation

GitHub Actions / lint-and-typecheck

'storylineCreatedEvent' is defined but never used
donationEvent,

Check warning on line 9 in src/app/api/cron/backfill/route.ts

View workflow job for this annotation

GitHub Actions / lint-and-typecheck

'donationEvent' is defined but never used
} from "../../../../../lib/contracts/abi";
import { STORY_FACTORY } from "../../../../../lib/contracts/constants";
import { hashContent } from "../../../../../lib/content";
import { detectWriterType } from "../../../../../lib/contracts/erc8004";
import type { Database } from "../../../../../lib/supabase";

const IPFS_GATEWAY = "https://ipfs.filebase.io/ipfs/";
const IPFS_TIMEOUT_MS = 10_000;

/**
* How many blocks to scan per cron run (~5 min on Base = ~150 blocks at 2s/block).
* Slightly over-scan to handle timing variance.
*/
const SCAN_BLOCKS = BigInt(200);

/** Cron authorization — set CRON_SECRET env var to protect this endpoint */
function verifyCron(req: Request): boolean {
const secret = process.env.CRON_SECRET;
if (!secret) return true; // no secret configured = open (dev mode)
const authHeader = req.headers.get("authorization");
return authHeader === `Bearer ${secret}`;
}

async function fetchIPFSContent(cid: string): Promise<string | null> {
try {
const res = await fetch(`${IPFS_GATEWAY}${cid}`, {
signal: AbortSignal.timeout(IPFS_TIMEOUT_MS),
});
if (!res.ok) return null;
return await res.text();
} catch {
return null;
}
}

async function getBlockTimestamp(blockNumber: bigint): Promise<string> {
const block = await publicClient.getBlock({ blockNumber });
return new Date(Number(block.timestamp) * 1000).toISOString();
}

type PlotInsert = Database["public"]["Tables"]["plots"]["Insert"];
type StorylineInsert = Database["public"]["Tables"]["storylines"]["Insert"];
type DonationInsert = Database["public"]["Tables"]["donations"]["Insert"];

export async function GET(req: Request) {
if (!verifyCron(req)) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}

const supabase = createServerClient();
if (!supabase) {
return NextResponse.json(
{ error: "Supabase not configured" },
{ status: 500 }
);
}

// Skip if StoryFactory not yet deployed
if (STORY_FACTORY === "0x0000000000000000000000000000000000000000") {
return NextResponse.json({
skipped: true,
reason: "StoryFactory not deployed yet",
});
}

const currentBlock = await publicClient.getBlockNumber();

// Read last processed block from persistent cursor
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const { data: cursor } = await (supabase.from("backfill_cursor") as any)
.select("last_block")
.eq("id", 1)
.single();
const lastBlock = cursor?.last_block ? BigInt(cursor.last_block) : BigInt(0);

// Start from block after last processed; cap toBlock to limit scan per run
const fromBlock = lastBlock > BigInt(0) ? lastBlock + BigInt(1) : BigInt(0);

if (fromBlock > currentBlock) {
return NextResponse.json({ skipped: true, reason: "Already up to date" });
}

// Cap scan range per run to avoid timeouts on large backlogs
const toBlock = (fromBlock + SCAN_BLOCKS) < currentBlock
? fromBlock + SCAN_BLOCKS
: currentBlock;

// Fetch all StoryFactory logs in the scan range
const logs = await publicClient.getLogs({
address: STORY_FACTORY,
fromBlock,
toBlock,
});

let storylinesInserted = 0;
let plotsInserted = 0;
let donationsInserted = 0;
let errors = 0;

// Cache block timestamps to avoid redundant RPC calls
const blockTimestampCache = new Map<bigint, string>();
async function getCachedBlockTimestamp(blockNumber: bigint): Promise<string> {
const cached = blockTimestampCache.get(blockNumber);
if (cached) return cached;
const ts = await getBlockTimestamp(blockNumber);
blockTimestampCache.set(blockNumber, ts);
return ts;
}

for (const log of logs) {
try {
const decoded = decodeEventLog({
abi: storyFactoryAbi,
data: log.data,
topics: log.topics,
});

const txHash = log.transactionHash!.toLowerCase();
const logIndex = log.logIndex!;

if (decoded.eventName === "StorylineCreated") {
await processStorylineCreated(
decoded,
log,
txHash,
logIndex,
supabase,
getCachedBlockTimestamp
);
storylinesInserted++;
} else if (decoded.eventName === "PlotChained") {
await processPlotChained(
decoded,
log,
txHash,
logIndex,
supabase,
getCachedBlockTimestamp
);
plotsInserted++;
} else if (decoded.eventName === "Donation") {
await processDonation(
decoded,
log,
txHash,
logIndex,
supabase,
getCachedBlockTimestamp
);
donationsInserted++;
}
} catch {
errors++;
}
}

// Persist cursor — advance to highest block actually scanned
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await (supabase.from("backfill_cursor") as any)
.update({ last_block: Number(toBlock), updated_at: new Date().toISOString() })
.eq("id", 1);

return NextResponse.json({
scanned: { fromBlock: Number(fromBlock), toBlock: Number(toBlock) },
processed: {
storylines: storylinesInserted,
plots: plotsInserted,
donations: donationsInserted,
},
errors,
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type DecodedEvent = any;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type SupabaseClient = any;

async function processStorylineCreated(
decoded: DecodedEvent,
log: Log,
txHash: string,
logIndex: number,
supabase: SupabaseClient,
getTimestamp: (blockNumber: bigint) => Promise<string>
) {
const {
storylineId,
writer,
tokenAddress,
title,
hasDeadline,
openingCID,
openingHash,
} = decoded.args;

const timestampISO = await getTimestamp(log.blockNumber!);
const writerType = await detectWriterType(writer);

const storylineRow: StorylineInsert = {
storyline_id: Number(storylineId),
writer_address: writer.toLowerCase(),
token_address: tokenAddress.toLowerCase(),
title,
plot_count: 1,
has_deadline: hasDeadline,
writer_type: writerType,
last_plot_time: timestampISO,
block_timestamp: timestampISO,
tx_hash: txHash,
log_index: logIndex,
};

await supabase
.from("storylines")
.upsert(storylineRow, { onConflict: "tx_hash,log_index" });

// Insert genesis plot
const content = await fetchIPFSContent(openingCID);
if (content !== null && hashContent(content) === openingHash) {
const plotRow: PlotInsert = {
storyline_id: Number(storylineId),
plot_index: 0,
writer_address: writer.toLowerCase(),
content,
content_cid: openingCID,
content_hash: openingHash as string,
block_timestamp: timestampISO,
tx_hash: txHash,
log_index: logIndex,
};
await supabase
.from("plots")
.upsert(plotRow, { onConflict: "tx_hash,log_index" });
}
}

async function processPlotChained(
decoded: DecodedEvent,
log: Log,
txHash: string,
logIndex: number,
supabase: SupabaseClient,
getTimestamp: (blockNumber: bigint) => Promise<string>
) {
const { storylineId, plotIndex, writer, contentCID, contentHash } =
decoded.args;

const content = await fetchIPFSContent(contentCID);
if (content === null) return; // skip if content unavailable
if (hashContent(content) !== contentHash) return; // skip if hash mismatch

const timestampISO = await getTimestamp(log.blockNumber!);

const row: PlotInsert = {
storyline_id: Number(storylineId),
plot_index: Number(plotIndex),
writer_address: writer.toLowerCase(),
content,
content_cid: contentCID,
content_hash: contentHash as string,
block_timestamp: timestampISO,
tx_hash: txHash,
log_index: logIndex,
};

await supabase
.from("plots")
.upsert(row, { onConflict: "tx_hash,log_index" });
}

async function processDonation(
decoded: DecodedEvent,
log: Log,
txHash: string,
logIndex: number,
supabase: SupabaseClient,
getTimestamp: (blockNumber: bigint) => Promise<string>
) {
const { storylineId, donor, amount } = decoded.args;
const timestampISO = await getTimestamp(log.blockNumber!);

const row: DonationInsert = {
storyline_id: Number(storylineId),
donor_address: donor.toLowerCase(),
amount: amount.toString(),
block_timestamp: timestampISO,
tx_hash: txHash,
log_index: logIndex,
};

await supabase
.from("donations")
.upsert(row, { onConflict: "tx_hash,log_index" });
}
10 changes: 10 additions & 0 deletions supabase/migrations/00003_backfill_cursor.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Persistent cursor for cron backfill — tracks last processed block.
create table if not exists backfill_cursor (
id integer primary key default 1 check (id = 1), -- singleton row
last_block bigint not null default 0,
updated_at timestamptz not null default now()
);

-- Seed the singleton row
insert into backfill_cursor (id, last_block) values (1, 0)
on conflict (id) do nothing;
8 changes: 8 additions & 0 deletions vercel.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"crons": [
{
"path": "/api/cron/backfill",
"schedule": "*/5 * * * *"
}
]
}
Loading