Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import {
PolymarketMarketGraphqlProcessed,
isInitialConfirmationLogged,
fetchLatestAIDeepLink,
OrderFilledEventWithTrade,
fetchOrderFilledEvents,
} from "./common";
import * as common from "./common";

Expand All @@ -47,6 +49,12 @@ function getThresholds() {
}

const blocksPerSecond = POLYGON_BLOCKS_PER_HOUR / 3_600;
type ProposalProcessingContext = {
currentBlock?: number;
lookbackBlocks?: number;
gapBlocks?: number;
orderFilledEvents?: OrderFilledEventWithTrade[];
};

function outcomeIndexes(
isSportsMarket: boolean,
Expand Down Expand Up @@ -96,14 +104,16 @@ export async function processProposal(
markets: PolymarketMarketGraphqlProcessed[],
orderbooks: Record<string, MarketOrderbook>,
params: MonitoringParams,
logger: typeof Logger
logger: typeof Logger,
context?: ProposalProcessingContext
): Promise<boolean /* notified */> {
const thresholds = getThresholds();
const isSportsRequest = proposal.requester === params.ctfSportsOracleAddress;

const currentBlock = await params.provider.getBlockNumber();
const lookbackBlocks = Math.round(params.fillEventsLookbackSeconds * blocksPerSecond);
const gapBlocks = Math.round(params.fillEventsProposalGapSeconds * blocksPerSecond);
const currentBlock = context?.currentBlock ?? (await params.provider.getBlockNumber());
const lookbackBlocks =
context?.lookbackBlocks ?? Math.round(params.fillEventsLookbackSeconds * blocksPerSecond);
const gapBlocks = context?.gapBlocks ?? Math.round(params.fillEventsProposalGapSeconds * blocksPerSecond);
const proposalGapStartBlock = Number(proposal.proposalBlockNumber) + gapBlocks;

const checkMarket = async (market: PolymarketMarketGraphqlProcessed): Promise<boolean> => {
Expand All @@ -122,7 +132,10 @@ export async function processProposal(
const buyingLoserSide = books[outcome.loser].bids.find((b) => b.price > thresholds.bids);

const fromBlock = Math.max(proposalGapStartBlock, currentBlock - lookbackBlocks);
const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock);
const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock, {
cachedEvents: context?.orderFilledEvents,
toBlock: currentBlock,
});

const soldWinner = fills[outcome.winner].filter((f) => f.type === "sell" && f.price < thresholds.asks);
const boughtLoser = fills[outcome.loser].filter((f) => f.type === "buy" && f.price > thresholds.bids);
Expand Down Expand Up @@ -313,10 +326,31 @@ export async function monitorTransactionsProposedOrderBook(
activeBundles = survivingBundles;
}

if (!activeBundles.length) {
console.log("All proposals have been checked!");
return;
}

const lookbackBlocks = Math.round(params.fillEventsLookbackSeconds * blocksPerSecond);
const gapBlocks = Math.round(params.fillEventsProposalGapSeconds * blocksPerSecond);
const currentBlock = await params.provider.getBlockNumber();

const fromBlocks = activeBundles.map(({ proposal }) =>
Math.max(Number(proposal.proposalBlockNumber) + gapBlocks, currentBlock - lookbackBlocks)
);
const earliestFromBlock = Math.min(...fromBlocks);
const orderFilledEventsPromise = fetchOrderFilledEvents(params, earliestFromBlock, currentBlock);

await Promise.all(
activeBundles.map(async ({ proposal, markets }) => {
try {
const alerted = await processProposal(proposal, markets, orderbookMap, params, logger);
const sharedOrderFilledEvents = await orderFilledEventsPromise;
const alerted = await processProposal(proposal, markets, orderbookMap, params, logger, {
currentBlock,
lookbackBlocks,
gapBlocks,
orderFilledEvents: sharedOrderFilledEvents,
});
if (alerted) await persistNotified(proposal, logger);
} catch (err) {
await logErrorAndPersist(proposal, err as Error);
Expand Down
165 changes: 137 additions & 28 deletions packages/monitor-v2/src/monitor-polymarket/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getRetryProvider, paginatedEventQuery as umaPaginatedEventQuery } from "@uma/common";
import { createHttpClient } from "@uma/toolkit";
import { AxiosError, AxiosInstance } from "axios";
import { AxiosError, AxiosInstance, AxiosRequestConfig } from "axios";
export const paginatedEventQuery = umaPaginatedEventQuery;

import type { Provider } from "@ethersproject/abstract-provider";
Expand Down Expand Up @@ -75,10 +75,12 @@ export interface MonitoringParams {
fillEventsLookbackSeconds: number;
fillEventsProposalGapSeconds: number;
httpClient: ReturnType<typeof createHttpClient>;
aiDeeplinkHttpClient: ReturnType<typeof createHttpClient>;
orderBookBatchSize: number;
ooV2Addresses: string[];
ooV1Addresses: string[];
aiConfig?: AIConfig;
aiDeeplinkTimeout: number;
}
interface PolymarketMarketGraphql {
question: string;
Expand All @@ -105,6 +107,13 @@ export interface PolymarketTradeInformation {
timestamp: number;
}

export interface OrderFilledEventWithTrade {
blockNumber: number;
makerAssetId: string;
takerAssetId: string;
trade: PolymarketTradeInformation;
}

export interface PolymarketOrderBook {
market: string;
asset_id: string;
Expand Down Expand Up @@ -376,40 +385,39 @@ export const getPolymarketMarketInformation = async (

const getTradeInfoFromOrderFilledEvent = async (
provider: Provider,
event: any
event: any,
blockTimestamp?: number
): Promise<PolymarketTradeInformation> => {
const blockTimestamp = (await provider.getBlock(event.blockNumber)).timestamp;
const timestamp = blockTimestamp ?? (await provider.getBlock(event.blockNumber)).timestamp;
const isBuy = event.args.makerAssetId.toString() === "0";
const numerator = (isBuy ? event.args.makerAmountFilled : event.args.takerAmountFilled).mul(1000);
const denominator = isBuy ? event.args.takerAmountFilled : event.args.makerAmountFilled;
const price = numerator.div(denominator).toNumber() / 1000;
return {
price,
type: isBuy ? "buy" : "sell",
timestamp: blockTimestamp,
timestamp,
// Convert to decimal value with 2 decimals
amount: (isBuy ? event.args.takerAmountFilled : event.args.makerAmountFilled).div(10_000).toNumber() / 100,
};
};

export const getOrderFilledEvents = async (
export const fetchOrderFilledEvents = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
startBlockNumber: number,
endBlockNumber?: number
): Promise<OrderFilledEventWithTrade[]> => {
const ctfExchange = new ethers.Contract(
params.ctfExchangeAddress,
require("./abi/ctfExchange.json"),
params.provider
);

const currentBlockNumber = await params.provider.getBlockNumber();
const maxBlockLookBack = params.maxBlockLookBack;

const toBlock = endBlockNumber ?? (await params.provider.getBlockNumber());
const searchConfig = {
fromBlock: startBlockNumber,
toBlock: currentBlockNumber,
maxBlockLookBack,
toBlock,
maxBlockLookBack: params.maxBlockLookBack,
};

const events: Event[] = await paginatedEventQuery(
Expand All @@ -420,25 +428,61 @@ export const getOrderFilledEvents = async (
queryFilterSafe
);

const outcomeTokenOne = await Promise.all(
events
.filter((event) => {
return [event?.args?.takerAssetId.toString(), event?.args?.makerAssetId.toString()].includes(clobTokenIds[0]);
})
.map((event) => getTradeInfoFromOrderFilledEvent(params.provider, event))
);
const blockTimestamps = new Map<number, number>();
const getTimestamp = async (blockNumber: number): Promise<number> => {
if (!blockTimestamps.has(blockNumber)) {
blockTimestamps.set(blockNumber, (await params.provider.getBlock(blockNumber)).timestamp);
}
return blockTimestamps.get(blockNumber)!;
};

const outcomeTokenTwo = await Promise.all(
events
.filter((event) => {
return [event?.args?.takerAssetId.toString(), event?.args?.makerAssetId.toString()].includes(clobTokenIds[1]);
})
.map((event) => getTradeInfoFromOrderFilledEvent(params.provider, event))
return Promise.all(
events.map(async (event) => {
const blockTimestamp = await getTimestamp(event.blockNumber);
return {
blockNumber: event.blockNumber,
makerAssetId: event?.args?.makerAssetId.toString(),
takerAssetId: event?.args?.takerAssetId.toString(),
trade: await getTradeInfoFromOrderFilledEvent(params.provider, event, blockTimestamp),
};
})
);
};

export const filterOrderFilledEvents = (
orderFilledEvents: OrderFilledEventWithTrade[],
clobTokenIds: [string, string],
startBlockNumber: number
): [PolymarketTradeInformation[], PolymarketTradeInformation[]] => {
const [tokenOne, tokenTwo] = clobTokenIds;
const eventsWithinWindow = orderFilledEvents.filter((event) => event.blockNumber >= startBlockNumber);

const outcomeTokenOne = eventsWithinWindow
.filter((event) => [event.takerAssetId, event.makerAssetId].includes(tokenOne))
.map((event) => event.trade);

const outcomeTokenTwo = eventsWithinWindow
.filter((event) => [event.takerAssetId, event.makerAssetId].includes(tokenTwo))
.map((event) => event.trade);

return [outcomeTokenOne, outcomeTokenTwo];
};

export const getOrderFilledEvents = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number,
opts?: {
toBlock?: number;
cachedEvents?: OrderFilledEventWithTrade[];
}
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
const orderFilledEvents =
opts?.cachedEvents ?? (await fetchOrderFilledEvents(params, startBlockNumber, opts?.toBlock));

return filterOrderFilledEvents(orderFilledEvents, clobTokenIds, startBlockNumber);
};

export const calculatePolymarketQuestionID = (ancillaryData: string): string => {
return ethers.utils.keccak256(ancillaryData);
};
Expand Down Expand Up @@ -712,16 +756,25 @@ export async function fetchLatestAIDeepLink(
if (!params.aiConfig) {
return { deeplink: undefined };
}
const startTime = Date.now();
try {
const questionId = calculatePolymarketQuestionID(proposal.ancillaryData);
const response = await params.httpClient.get<UMAAIRetriesLatestResponse>(params.aiConfig.apiUrl, {
const requestConfig: AxiosRequestConfig = {
params: {
limit: 50,
search: proposal.proposalHash,
last_page: false,
project_id: params.aiConfig.projectId,
},
});
};

requestConfig.timeout = params.aiDeeplinkTimeout;

const response = await params.aiDeeplinkHttpClient.get<UMAAIRetriesLatestResponse>(
params.aiConfig.apiUrl,
requestConfig
);
const duration = Date.now() - startTime;

const result = response.data?.elements?.find(
(element) => element.data.input.timing?.expiration_timestamp === proposal.proposalExpirationTimestamp.toNumber()
Expand All @@ -739,20 +792,52 @@ export async function fetchLatestAIDeepLink(
status: response.status,
statusText: response.statusText,
},
durationMs: duration,
notificationPath: "otb-monitoring",
});
return { deeplink: undefined };
}

logger.debug({
at: "PolymarketMonitor",
message: "Successfully fetched AI deeplink",
proposalHash: proposal.proposalHash,
durationMs: duration,
});

return {
deeplink: `${params.aiConfig.resultsBaseUrl}/${result.id}`,
};
} catch (error) {
const duration = Date.now() - startTime;
const axiosError = error as AxiosError;

logger.debug({
at: "PolymarketMonitor",
message: "Failed to fetch AI deeplink",
err: error instanceof Error ? error.message : String(error),
proposalHash: proposal.proposalHash,
durationMs: duration,
errorDetails: {
code: axiosError?.code,
response: axiosError?.response
? {
status: axiosError.response?.status,
statusText: axiosError.response?.statusText,
headers: axiosError.response?.headers,
}
: undefined,
request: axiosError?.config
? {
url: axiosError.config?.url,
method: axiosError.config?.method,
timeout: axiosError.config?.timeout,
baseURL: axiosError.config?.baseURL,
}
: undefined,
isTimeout:
axiosError?.code === "ECONNABORTED" || (error instanceof Error && error.message?.includes("timeout")),
},
});
return { deeplink: undefined };
}
Expand Down Expand Up @@ -902,6 +987,7 @@ export const initMonitoringParams = async (
const minTimeBetweenRequests = env.MIN_TIME_BETWEEN_REQUESTS ? Number(env.MIN_TIME_BETWEEN_REQUESTS) : 200;

const httpTimeout = env.HTTP_TIMEOUT ? Number(env.HTTP_TIMEOUT) : 10_000;
const aiDeeplinkTimeout = env.AI_DEEPLINK_TIMEOUT ? Number(env.AI_DEEPLINK_TIMEOUT) : 10_000;

const shouldResetTimeout = env.SHOULD_RESET_TIMEOUT !== "false";

Expand All @@ -924,6 +1010,27 @@ export const initMonitoringParams = async (
},
});

// Create a separate HTTP client for AI deeplink requests with unlimited concurrency
// This prevents AI deeplink requests from being queued behind other rate-limited requests
const aiDeeplinkHttpClient = createHttpClient({
axios: { timeout: aiDeeplinkTimeout },
rateLimit: { maxConcurrent: null, minTime: 0 }, // No rate limiting - unlimited concurrency
retry: {
retries: retryAttempts,
baseDelayMs: retryDelayMs,
shouldResetTimeout: false, // Don't reset timeout on retries - keep total time bounded by single timeout + retry delays
onRetry: (retryCount, err, config) => {
logger.debug({
at: "PolymarketMonitor",
message: `ai-deeplink-retry attempt #${retryCount} for ${config?.url}`,
error: err.code || err.message,
retryCount,
timeout: config?.timeout,
});
},
},
});

const ooV2Addresses = parseEnvList(env, "OOV2_ADDRESSES", [await getAddress("OptimisticOracleV2", chainId)]);
const ooV1Addresses = parseEnvList(env, "OOV1_ADDRESSES", [await getAddress("OptimisticOracle", chainId)]);

Expand All @@ -947,10 +1054,12 @@ export const initMonitoringParams = async (
fillEventsLookbackSeconds,
fillEventsProposalGapSeconds,
httpClient,
aiDeeplinkHttpClient,
orderBookBatchSize,
ooV2Addresses,
ooV1Addresses,
aiConfig,
aiDeeplinkTimeout,
};
};

Expand Down
Loading