Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions lib/user-upsert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Shared user upsert logic for register-by-wallet and onboard routes.
* Ensures consistent conflict resolution so concurrent calls cannot corrupt rows.
*/

import type { SupabaseClient } from "@supabase/supabase-js";
import type { Database } from "./supabase";

type UserInsert = Database["public"]["Tables"]["users"]["Insert"];
type UserRow = Database["public"]["Tables"]["users"]["Row"];

/**
* Find an existing user by wallet address.
* Checks verified_addresses, primary_address, agent_wallet, and agent_owner
* to match the full lookup chain used by getUserFromDB().
*/
export async function findUserByWallet(
supabase: SupabaseClient<Database>,
normalizedAddress: string,
): Promise<UserRow | null> {
const { data: byVerified } = await supabase
.from("users")
.select("*")
.contains("verified_addresses", [normalizedAddress])
.single();

if (byVerified) return byVerified;

const { data: byPrimary } = await supabase
.from("users")
.select("*")
.eq("primary_address", normalizedAddress)
.single();

if (byPrimary) return byPrimary;

const { data: byAgentWallet } = await supabase
.from("users")
.select("*")
.eq("agent_wallet", normalizedAddress)
.single();

if (byAgentWallet) return byAgentWallet;

const { data: byAgentOwner } = await supabase
.from("users")
.select("*")
.eq("agent_owner", normalizedAddress)
.single();

return byAgentOwner ?? null;
}

/**
* Upsert a user row with consistent conflict resolution.
* If an existing user is known, updates by id (most reliable).
* If inserting and a unique violation occurs, re-queries and updates by id.
*/
export async function upsertUser(
supabase: SupabaseClient<Database>,
userData: UserInsert,
normalizedAddress: string,
existingUser: UserRow | null,
): Promise<{ data: UserRow | null; error: string | null }> {
// Known existing user — always update by id
if (existingUser) {
const { data, error } = await supabase
.from("users")
.update(userData)
.eq("id", existingUser.id)
.select()
.single();

if (error) return { data: null, error: error.message };
return { data, error: null };
}

// New user — attempt insert
const { data: insertData, error: insertError } = await supabase
.from("users")
.insert(userData)
.select()
.single();

if (!insertError) return { data: insertData, error: null };

// Unique violation — another concurrent call inserted first.
// Re-query to get the row's id, then update by id.
if (insertError.code === "23505") {
const raceUser = await findUserByWallet(supabase, normalizedAddress);
if (raceUser) {
const { data, error } = await supabase
.from("users")
.update(userData)
.eq("id", raceUser.id)
.select()
.single();

if (error) return { data: null, error: error.message };
return { data, error: null };
}
return { data: null, error: "Conflict but user not found on retry" };
}

return { data: null, error: insertError.message };
}
83 changes: 14 additions & 69 deletions src/app/api/user/onboard/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { buildUserData } from "../../../../../lib/user-data";
import { getAgentMetadata, getAgentMetadataById, erc8004Abi } from "../../../../../lib/contracts/erc8004";
import { ERC8004_REGISTRY } from "../../../../../lib/contracts/constants";
import { publicClient } from "../../../../../lib/rpc";
import { findUserByWallet, upsertUser } from "../../../../../lib/user-upsert";
import type { Address } from "viem";

const COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes
Expand Down Expand Up @@ -37,22 +38,7 @@ export async function POST(request: NextRequest) {
}

// Check existing user (by verified_addresses or primary_address)
let existingUser = null;
const { data: byVerified } = await supabase
.from("users")
.select("*")
.contains("verified_addresses", [normalizedAddress])
.single();
if (byVerified) {
existingUser = byVerified;
} else {
const { data: byPrimary } = await supabase
.from("users")
.select("*")
.eq("primary_address", normalizedAddress)
.single();
existingUser = byPrimary;
}
const existingUser = await findUserByWallet(supabase, normalizedAddress);

// Enforce 5-min cooldown on ALL refreshes
if (existingUser?.steemhunt_fetched_at) {
Expand Down Expand Up @@ -161,60 +147,19 @@ export async function POST(request: NextRequest) {
}
}

// Upsert — update by existing row identity
if (existingUser) {
const { data, error } = await supabase
.from("users")
.update(userData)
.eq("id", existingUser.id)
.select()
.single();

if (error) {
console.error("[onboard] Update error:", error);
return NextResponse.json(
{ error: "Failed to update user data" },
{ status: 500 },
);
}
return NextResponse.json({ success: true, user: data });
} else {
const { data: insertData, error: insertError } = await supabase
.from("users")
.insert(userData)
.select()
.single();

if (insertError) {
if (insertError.code === "23505") {
// Unique violation — update by conflicting identity
const updateQuery = supabase.from("users").update(userData);
const conditioned =
userData.fid != null
? updateQuery.eq("fid", userData.fid)
: updateQuery.eq("primary_address", normalizedAddress);

const { data: updateData, error: updateError } = await conditioned
.select()
.single();

if (updateError) {
console.error("[onboard] Update error:", updateError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}
return NextResponse.json({ success: true, user: updateData });
}
console.error("[onboard] Insert error:", insertError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}
return NextResponse.json({ success: true, user: insertData });
const { data: finalData, error: upsertError } = await upsertUser(
supabase, userData, normalizedAddress, existingUser,
);

if (upsertError) {
console.error("[onboard] Upsert error:", upsertError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}

return NextResponse.json({ success: true, user: finalData });
} catch (error) {
console.error("[onboard] Error:", error);
return NextResponse.json(
Expand Down
65 changes: 11 additions & 54 deletions src/app/api/user/register-by-wallet/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { getUserByWallet } from "../../../../../lib/farcaster-indexer";
import { lookupByAddress } from "../../../../../lib/farcaster";
import { fetchQuotientScore } from "../../../../../lib/quotient";
import { buildUserData } from "../../../../../lib/user-data";
import { findUserByWallet, upsertUser } from "../../../../../lib/user-upsert";

/**
* POST /api/user/register-by-wallet
Expand Down Expand Up @@ -32,23 +33,7 @@ export async function POST(request: NextRequest) {
}

// Check if user exists (by verified_addresses or primary_address)
let existingUser = null;
const { data: byVerified } = await supabase
.from("users")
.select("*")
.contains("verified_addresses", [normalizedAddress])
.single();

if (byVerified) {
existingUser = byVerified;
} else {
const { data: byPrimary } = await supabase
.from("users")
.select("*")
.eq("primary_address", normalizedAddress)
.single();
existingUser = byPrimary;
}
const existingUser = await findUserByWallet(supabase, normalizedAddress);

// If user exists and data is fresh (< 5 min), return cached
if (existingUser?.steemhunt_fetched_at) {
Expand Down Expand Up @@ -100,44 +85,16 @@ export async function POST(request: NextRequest) {
quotientData,
});

// Upsert: INSERT then UPDATE on conflict
const { data: insertData, error: insertError } = await supabase
.from("users")
.insert(userData)
.select()
.single();

let finalData = insertData;

if (insertError) {
if (insertError.code === "23505") {
// Unique violation — update by the conflicting identity
const updateQuery = supabase.from("users").update(userData);
const conditioned = existingUser
? updateQuery.eq("id", existingUser.id)
: userData.fid != null
? updateQuery.eq("fid", userData.fid)
: updateQuery.eq("primary_address", normalizedAddress);

const { data: updateData, error: updateError } = await conditioned
.select()
.single();
const { data: finalData, error: upsertError } = await upsertUser(
supabase, userData, normalizedAddress, existingUser,
);

if (updateError) {
console.error("[register-by-wallet] Update error:", updateError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}
finalData = updateData;
} else {
console.error("[register-by-wallet] Insert error:", insertError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}
if (upsertError) {
console.error("[register-by-wallet] Upsert error:", upsertError);
return NextResponse.json(
{ error: "Failed to save user data" },
{ status: 500 },
);
}

return NextResponse.json({ success: true, user: finalData });
Expand Down
Loading