diff --git a/lib/notifications.server.ts b/lib/notifications.server.ts index a64a7a43..1ad22a23 100644 --- a/lib/notifications.server.ts +++ b/lib/notifications.server.ts @@ -1,11 +1,19 @@ /** - * [#489] Farcaster notification system for PlotLink. + * [#489, #521] Farcaster notification system for PlotLink. * * Handles notification token storage (Supabase) and sending push * notifications to Farcaster clients via the miniapp notification API. + * + * [#521] Targeted notifications: new plot notifications go only to + * storyline token holders. Price change alerts (>10%) sent to holders. */ import { createClient } from "@supabase/supabase-js"; +import { createPublicClient, http, erc20Abi, type Address } from "viem"; +import { base } from "viem/chains"; +import { STORY_FACTORY } from "./contracts/constants"; + +const NEYNAR_BASE = "https://api.neynar.com/v2/farcaster"; const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL || ""; const supabaseServiceKey = process.env.SUPABASE_SERVICE_ROLE_KEY || ""; @@ -16,6 +24,13 @@ function getSupabase() { }); } +function getRpcClient() { + return createPublicClient({ + chain: base, + transport: http(process.env.NEXT_PUBLIC_RPC_URL), + }); +} + export interface NotificationToken { fid: number; notificationToken: string; @@ -32,17 +47,24 @@ export async function saveUserNotificationToken( ): Promise { const supabase = getSupabase(); - const { error } = await supabase.from("notification_tokens").upsert( - { - fid, - notification_token: token, - notification_url: url, - client_app_fid: clientAppFid || null, - enabled: true, - updated_at: new Date().toISOString(), - }, - { onConflict: "fid" }, - ); + // Resolve wallet from FID via trusted Neynar API + const walletAddress = await resolveWalletForFid(fid); + + const row: Record = { + fid, + notification_token: token, + notification_url: url, + client_app_fid: clientAppFid || null, + enabled: true, + updated_at: new Date().toISOString(), + }; + if (walletAddress) { + row.wallet_address = walletAddress; + } + + const { error } = await supabase + .from("notification_tokens") + .upsert(row, { onConflict: "fid" }); if (error) { console.error("Failed to save notification token:", error); @@ -83,6 +105,79 @@ export async function getEnabledTokens(): Promise { })); } +// ---- [#521] FID → Wallet Resolution (trusted) ---- + +/** + * Resolve a Farcaster FID to its verified Ethereum address via Neynar API. + * Returns the first verified address, or null if unavailable. + */ +async function resolveWalletForFid(fid: number): Promise { + const apiKey = process.env.NEYNAR_API_KEY; + if (!apiKey) return null; + + try { + const res = await fetch(`${NEYNAR_BASE}/user/bulk?fids=${fid}`, { + headers: { accept: "application/json", "x-api-key": apiKey }, + signal: AbortSignal.timeout(3000), + }); + if (!res.ok) return null; + const json = await res.json(); + const user = json.users?.[0]; + if (!user) return null; + + // Use first verified Ethereum address + const verifiedAddress = user.verified_addresses?.eth_addresses?.[0]; + return verifiedAddress?.toLowerCase() ?? null; + } catch { + return null; + } +} + +// ---- [#521] Token Holder Targeting ---- + +/** + * Get notification tokens for users who hold a specific storyline token. + * Queries enabled tokens with wallet_address, then checks on-chain balanceOf. + */ +async function getTokenHolderTokens( + tokenAddress: string, +): Promise { + const supabase = getSupabase(); + const rpc = getRpcClient(); + + // Get all enabled tokens that have a wallet_address + const { data, error } = await supabase + .from("notification_tokens") + .select("*") + .eq("enabled", true) + .not("wallet_address", "is", null); + + if (error || !data || data.length === 0) return []; + + // Check on-chain balances via multicall + const balanceResults = await rpc.multicall({ + contracts: data.map((row) => ({ + address: tokenAddress as Address, + abi: erc20Abi, + functionName: "balanceOf" as const, + args: [row.wallet_address as Address], + })), + allowFailure: true, + }); + + // Filter to holders (balance > 0) + return data + .filter((_, i) => { + const result = balanceResults[i]; + return result.status === "success" && (result.result as bigint) > BigInt(0); + }) + .map((row) => ({ + fid: row.fid, + notificationToken: row.notification_token, + notificationUrl: row.notification_url, + })); +} + // ---- Notification Sending ---- export async function sendNotification(params: { @@ -182,19 +277,31 @@ export async function notifyNewStoryline( } /** - * Notify all users with enabled notifications about a new plot. - * Called from the backfill cron when a new plot is indexed. + * [#521] Notify token holders about a new plot in a storyline they hold. + * Falls back to all users if no token address or no holders found. */ export async function notifyNewPlot( storylineId: number, storyTitle: string, plotIndex: number, ): Promise { - const tokens = await getEnabledTokens(); - if (tokens.length === 0) return; - + const supabase = getSupabase(); const label = plotIndex === 0 ? "Genesis" : `Chapter ${plotIndex}`; + // Look up storyline token address + const { data: storyline } = await supabase + .from("storylines") + .select("token_address") + .eq("storyline_id", storylineId) + .eq("contract_address", STORY_FACTORY.toLowerCase()) + .single(); + + // Only notify token holders — no broadcast fallback + if (!storyline?.token_address) return; + + const tokens = await getTokenHolderTokens(storyline.token_address); + if (tokens.length === 0) return; + await sendNotification({ notificationId: `pl-new-plot-${storylineId}-${plotIndex}`, title: `New ${label} published`, @@ -203,3 +310,61 @@ export async function notifyNewPlot( tokens, }); } + +// ---- [#521] Price Change Alerts ---- + +const PRICE_CHANGE_THRESHOLD = 10; // percent + +/** + * Snapshot current price for a token and check for >10% change. + * If threshold exceeded, sends alert to all holders of that token. + */ +export async function checkPriceChangeAlert( + tokenAddress: string, + currentPrice: number, + storylineId: number, + storyTitle: string, +): Promise { + const supabase = getSupabase(); + + // Get previous snapshot + const { data: prev } = await supabase + .from("token_price_snapshots") + .select("price") + .eq("token_address", tokenAddress.toLowerCase()) + .order("snapshot_time", { ascending: false }) + .limit(1) + .single(); + + // Save current snapshot + await supabase.from("token_price_snapshots").insert({ + token_address: tokenAddress.toLowerCase(), + price: currentPrice, + }); + + if (!prev || !prev.price) return false; + + const previousPrice = Number(prev.price); + if (previousPrice === 0) return false; + + const changePercent = ((currentPrice - previousPrice) / previousPrice) * 100; + + if (Math.abs(changePercent) < PRICE_CHANGE_THRESHOLD) return false; + + // Alert holders + const tokens = await getTokenHolderTokens(tokenAddress); + if (tokens.length === 0) return false; + + const direction = changePercent > 0 ? "up" : "down"; + const absChange = Math.abs(changePercent).toFixed(1); + + await sendNotification({ + notificationId: `pl-price-alert-${tokenAddress}-${Date.now()}`, + title: `Price ${direction} ${absChange}%`, + body: `"${storyTitle.slice(0, 30)}" token moved ${direction} ${absChange}%`, + targetUrl: `${appUrl}/story/${storylineId}`, + tokens, + }); + + return true; +} diff --git a/package.json b/package.json index c2368d4e..119c35b8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "plotlink", - "version": "0.1.5", + "version": "0.1.6", "private": true, "workspaces": [ "packages/*" diff --git a/src/app/api/cron/price-alerts/route.ts b/src/app/api/cron/price-alerts/route.ts new file mode 100644 index 00000000..be0774ea --- /dev/null +++ b/src/app/api/cron/price-alerts/route.ts @@ -0,0 +1,68 @@ +import { NextResponse } from "next/server"; +import { createServerClient, type Storyline } from "../../../../../lib/supabase"; +import { getTokenPrice } from "../../../../../lib/price"; +import { publicClient } from "../../../../../lib/rpc"; +import { checkPriceChangeAlert } from "../../../../../lib/notifications.server"; +import { STORY_FACTORY } from "../../../../../lib/contracts/constants"; +import { type Address } from "viem"; + +/** + * [#521] Cron: check token prices and send alerts for >10% changes. + * Runs every ~5 minutes alongside the backfill cron. + */ +export async function GET(req: Request) { + const secret = process.env.CRON_SECRET; + if (secret) { + const authHeader = req.headers.get("authorization"); + if (authHeader !== `Bearer ${secret}`) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + } else if (process.env.NODE_ENV === "production") { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const supabase = createServerClient(); + if (!supabase) { + return NextResponse.json({ error: "Supabase not configured" }, { status: 500 }); + } + + // Get all active storylines with tokens + const { data: storylines } = await supabase + .from("storylines") + .select("*") + .eq("hidden", false) + .eq("sunset", false) + .neq("token_address", "") + .eq("contract_address", STORY_FACTORY.toLowerCase()) + .returns(); + + if (!storylines || storylines.length === 0) { + return NextResponse.json({ checked: 0 }); + } + + let checked = 0; + let alerts = 0; + + for (const sl of storylines) { + try { + const priceInfo = await getTokenPrice(sl.token_address as Address, publicClient); + if (!priceInfo) continue; + + const price = parseFloat(priceInfo.pricePerToken); + if (price <= 0) continue; + + const alerted = await checkPriceChangeAlert( + sl.token_address, + price, + sl.storyline_id, + sl.title, + ); + checked++; + if (alerted) alerts++; + } catch { + // Skip individual token errors + } + } + + return NextResponse.json({ checked, alerts }); +} diff --git a/supabase/migrations/00022_notification_targeting.sql b/supabase/migrations/00022_notification_targeting.sql new file mode 100644 index 00000000..b7efa067 --- /dev/null +++ b/supabase/migrations/00022_notification_targeting.sql @@ -0,0 +1,48 @@ +-- [#521] Targeted notifications for token holders +-- +-- 1. Add wallet_address to notification_tokens (links FID → wallet for on-chain lookups) +-- 2. Create notification_queue for batched, targeted delivery +-- 3. Create token_price_snapshots for price change detection + +-- 1. wallet_address on notification_tokens +ALTER TABLE notification_tokens + ADD COLUMN IF NOT EXISTS wallet_address TEXT; + +CREATE INDEX IF NOT EXISTS idx_notification_tokens_wallet + ON notification_tokens (wallet_address) + WHERE wallet_address IS NOT NULL; + +-- 2. Notification queue +CREATE TABLE IF NOT EXISTS notification_queue ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + target_fid INTEGER NOT NULL, + notification_type TEXT NOT NULL, + storyline_id BIGINT, + title TEXT NOT NULL, + body TEXT NOT NULL, + target_url TEXT NOT NULL, + status TEXT DEFAULT 'pending' + CHECK (status IN ('pending', 'sent', 'failed', 'skipped')), + error_message TEXT, + scheduled_at TIMESTAMPTZ DEFAULT NOW(), + sent_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_notification_queue_status + ON notification_queue (status, scheduled_at); + +CREATE INDEX IF NOT EXISTS idx_notification_queue_dedup + ON notification_queue (target_fid, notification_type, storyline_id) + WHERE status = 'pending'; + +-- 3. Token price snapshots (for >10% change alerts) +CREATE TABLE IF NOT EXISTS token_price_snapshots ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + token_address TEXT NOT NULL, + price NUMERIC NOT NULL, + snapshot_time TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_price_snapshots_token_time + ON token_price_snapshots (token_address, snapshot_time DESC);