Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 182 additions & 17 deletions lib/notifications.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
/**
* [#489] Farcaster notification system for PlotLink.
* [#489, #521] Farcaster notification system for PlotLink.
*
* Handles notification token storage (Supabase) and sending push
* notifications to Farcaster clients via the miniapp notification API.
*
* [#521] Targeted notifications: new plot notifications go only to
* storyline token holders. Price change alerts (>10%) sent to holders.
*/

import { createClient } from "@supabase/supabase-js";
import { createPublicClient, http, erc20Abi, type Address } from "viem";
import { base } from "viem/chains";
import { STORY_FACTORY } from "./contracts/constants";

const NEYNAR_BASE = "https://api.neynar.com/v2/farcaster";

const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL || "";
const supabaseServiceKey = process.env.SUPABASE_SERVICE_ROLE_KEY || "";
Expand All @@ -16,6 +24,13 @@ function getSupabase() {
});
}

function getRpcClient() {
return createPublicClient({
chain: base,
transport: http(process.env.NEXT_PUBLIC_RPC_URL),
});
}

export interface NotificationToken {
fid: number;
notificationToken: string;
Expand All @@ -32,17 +47,24 @@ export async function saveUserNotificationToken(
): Promise<void> {
const supabase = getSupabase();

const { error } = await supabase.from("notification_tokens").upsert(
{
fid,
notification_token: token,
notification_url: url,
client_app_fid: clientAppFid || null,
enabled: true,
updated_at: new Date().toISOString(),
},
{ onConflict: "fid" },
);
// Resolve wallet from FID via trusted Neynar API
const walletAddress = await resolveWalletForFid(fid);

const row: Record<string, unknown> = {
fid,
notification_token: token,
notification_url: url,
client_app_fid: clientAppFid || null,
enabled: true,
updated_at: new Date().toISOString(),
};
if (walletAddress) {
row.wallet_address = walletAddress;
}

const { error } = await supabase
.from("notification_tokens")
.upsert(row, { onConflict: "fid" });

if (error) {
console.error("Failed to save notification token:", error);
Expand Down Expand Up @@ -83,6 +105,79 @@ export async function getEnabledTokens(): Promise<NotificationToken[]> {
}));
}

// ---- [#521] FID → Wallet Resolution (trusted) ----

/**
* Resolve a Farcaster FID to its verified Ethereum address via Neynar API.
* Returns the first verified address, or null if unavailable.
*/
async function resolveWalletForFid(fid: number): Promise<string | null> {
const apiKey = process.env.NEYNAR_API_KEY;
if (!apiKey) return null;

try {
const res = await fetch(`${NEYNAR_BASE}/user/bulk?fids=${fid}`, {
headers: { accept: "application/json", "x-api-key": apiKey },
signal: AbortSignal.timeout(3000),
});
if (!res.ok) return null;
const json = await res.json();
const user = json.users?.[0];
if (!user) return null;

// Use first verified Ethereum address
const verifiedAddress = user.verified_addresses?.eth_addresses?.[0];
return verifiedAddress?.toLowerCase() ?? null;
} catch {
return null;
}
}

// ---- [#521] Token Holder Targeting ----

/**
* Get notification tokens for users who hold a specific storyline token.
* Queries enabled tokens with wallet_address, then checks on-chain balanceOf.
*/
async function getTokenHolderTokens(
tokenAddress: string,
): Promise<NotificationToken[]> {
const supabase = getSupabase();
const rpc = getRpcClient();

// Get all enabled tokens that have a wallet_address
const { data, error } = await supabase
.from("notification_tokens")
.select("*")
.eq("enabled", true)
.not("wallet_address", "is", null);

if (error || !data || data.length === 0) return [];

// Check on-chain balances via multicall
const balanceResults = await rpc.multicall({
contracts: data.map((row) => ({
address: tokenAddress as Address,
abi: erc20Abi,
functionName: "balanceOf" as const,
args: [row.wallet_address as Address],
})),
allowFailure: true,
});

// Filter to holders (balance > 0)
return data
.filter((_, i) => {
const result = balanceResults[i];
return result.status === "success" && (result.result as bigint) > BigInt(0);
})
.map((row) => ({
fid: row.fid,
notificationToken: row.notification_token,
notificationUrl: row.notification_url,
}));
}

// ---- Notification Sending ----

export async function sendNotification(params: {
Expand Down Expand Up @@ -182,19 +277,31 @@ export async function notifyNewStoryline(
}

/**
* Notify all users with enabled notifications about a new plot.
* Called from the backfill cron when a new plot is indexed.
* [#521] Notify token holders about a new plot in a storyline they hold.
* Falls back to all users if no token address or no holders found.
*/
export async function notifyNewPlot(
storylineId: number,
storyTitle: string,
plotIndex: number,
): Promise<void> {
const tokens = await getEnabledTokens();
if (tokens.length === 0) return;

const supabase = getSupabase();
const label = plotIndex === 0 ? "Genesis" : `Chapter ${plotIndex}`;

// Look up storyline token address
const { data: storyline } = await supabase
.from("storylines")
.select("token_address")
.eq("storyline_id", storylineId)
.eq("contract_address", STORY_FACTORY.toLowerCase())
.single();

// Only notify token holders — no broadcast fallback
if (!storyline?.token_address) return;

const tokens = await getTokenHolderTokens(storyline.token_address);
if (tokens.length === 0) return;

await sendNotification({
notificationId: `pl-new-plot-${storylineId}-${plotIndex}`,
title: `New ${label} published`,
Expand All @@ -203,3 +310,61 @@ export async function notifyNewPlot(
tokens,
});
}

// ---- [#521] Price Change Alerts ----

const PRICE_CHANGE_THRESHOLD = 10; // percent

/**
* Snapshot current price for a token and check for >10% change.
* If threshold exceeded, sends alert to all holders of that token.
*/
export async function checkPriceChangeAlert(
tokenAddress: string,
currentPrice: number,
storylineId: number,
storyTitle: string,
): Promise<boolean> {
const supabase = getSupabase();

// Get previous snapshot
const { data: prev } = await supabase
.from("token_price_snapshots")
.select("price")
.eq("token_address", tokenAddress.toLowerCase())
.order("snapshot_time", { ascending: false })
.limit(1)
.single();

// Save current snapshot
await supabase.from("token_price_snapshots").insert({
token_address: tokenAddress.toLowerCase(),
price: currentPrice,
});

if (!prev || !prev.price) return false;

const previousPrice = Number(prev.price);
if (previousPrice === 0) return false;

const changePercent = ((currentPrice - previousPrice) / previousPrice) * 100;

if (Math.abs(changePercent) < PRICE_CHANGE_THRESHOLD) return false;

// Alert holders
const tokens = await getTokenHolderTokens(tokenAddress);
if (tokens.length === 0) return false;

const direction = changePercent > 0 ? "up" : "down";
const absChange = Math.abs(changePercent).toFixed(1);

await sendNotification({
notificationId: `pl-price-alert-${tokenAddress}-${Date.now()}`,
title: `Price ${direction} ${absChange}%`,
body: `"${storyTitle.slice(0, 30)}" token moved ${direction} ${absChange}%`,
targetUrl: `${appUrl}/story/${storylineId}`,
tokens,
});

return true;
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "plotlink",
"version": "0.1.5",
"version": "0.1.6",
"private": true,
"workspaces": [
"packages/*"
Expand Down
68 changes: 68 additions & 0 deletions src/app/api/cron/price-alerts/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { NextResponse } from "next/server";
import { createServerClient, type Storyline } from "../../../../../lib/supabase";
import { getTokenPrice } from "../../../../../lib/price";
import { publicClient } from "../../../../../lib/rpc";
import { checkPriceChangeAlert } from "../../../../../lib/notifications.server";
import { STORY_FACTORY } from "../../../../../lib/contracts/constants";
import { type Address } from "viem";

/**
* [#521] Cron: check token prices and send alerts for >10% changes.
* Runs every ~5 minutes alongside the backfill cron.
*/
export async function GET(req: Request) {
const secret = process.env.CRON_SECRET;
if (secret) {
const authHeader = req.headers.get("authorization");
if (authHeader !== `Bearer ${secret}`) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
} else if (process.env.NODE_ENV === "production") {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}

const supabase = createServerClient();
if (!supabase) {
return NextResponse.json({ error: "Supabase not configured" }, { status: 500 });
}

// Get all active storylines with tokens
const { data: storylines } = await supabase
.from("storylines")
.select("*")
.eq("hidden", false)
.eq("sunset", false)
.neq("token_address", "")
.eq("contract_address", STORY_FACTORY.toLowerCase())
.returns<Storyline[]>();

if (!storylines || storylines.length === 0) {
return NextResponse.json({ checked: 0 });
}

let checked = 0;
let alerts = 0;

for (const sl of storylines) {
try {
const priceInfo = await getTokenPrice(sl.token_address as Address, publicClient);
if (!priceInfo) continue;

const price = parseFloat(priceInfo.pricePerToken);
if (price <= 0) continue;

const alerted = await checkPriceChangeAlert(
sl.token_address,
price,
sl.storyline_id,
sl.title,
);
checked++;
if (alerted) alerts++;
} catch {
// Skip individual token errors
}
}

return NextResponse.json({ checked, alerts });
}
48 changes: 48 additions & 0 deletions supabase/migrations/00022_notification_targeting.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- [#521] Targeted notifications for token holders
--
-- 1. Add wallet_address to notification_tokens (links FID → wallet for on-chain lookups)
-- 2. Create notification_queue for batched, targeted delivery
-- 3. Create token_price_snapshots for price change detection

-- 1. wallet_address on notification_tokens
ALTER TABLE notification_tokens
ADD COLUMN IF NOT EXISTS wallet_address TEXT;

CREATE INDEX IF NOT EXISTS idx_notification_tokens_wallet
ON notification_tokens (wallet_address)
WHERE wallet_address IS NOT NULL;

-- 2. Notification queue
CREATE TABLE IF NOT EXISTS notification_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target_fid INTEGER NOT NULL,
notification_type TEXT NOT NULL,
storyline_id BIGINT,
title TEXT NOT NULL,
body TEXT NOT NULL,
target_url TEXT NOT NULL,
status TEXT DEFAULT 'pending'
CHECK (status IN ('pending', 'sent', 'failed', 'skipped')),
error_message TEXT,
scheduled_at TIMESTAMPTZ DEFAULT NOW(),
sent_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_notification_queue_status
ON notification_queue (status, scheduled_at);

CREATE INDEX IF NOT EXISTS idx_notification_queue_dedup
ON notification_queue (target_fid, notification_type, storyline_id)
WHERE status = 'pending';

-- 3. Token price snapshots (for >10% change alerts)
CREATE TABLE IF NOT EXISTS token_price_snapshots (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
token_address TEXT NOT NULL,
price NUMERIC NOT NULL,
snapshot_time TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_price_snapshots_token_time
ON token_price_snapshots (token_address, snapshot_time DESC);
Loading