diff --git a/src/dao.ts b/src/dao.ts index af1ee04..75c79e3 100644 --- a/src/dao.ts +++ b/src/dao.ts @@ -293,6 +293,19 @@ export class DAO { PRIMARY KEY (token0, token1, hour) ); + CREATE TABLE IF NOT EXISTS hourly_twap_data + ( + token0 NUMERIC, + token1 NUMERIC, + hour timestamptz, + liquidity_weighted_tick NUMERIC, + total_liquidity_seconds NUMERIC, + total_time_seconds NUMERIC, + swap_count NUMERIC, + pool_count NUMERIC, + PRIMARY KEY (token0, token1, hour) + ); + CREATE TABLE IF NOT EXISTS hourly_tvl_delta_by_token ( @@ -985,6 +998,118 @@ export class DAO { values: [since], }); + await this.pg.query({ + text: ` + INSERT INTO hourly_twap_data + (WITH swap_times AS ( + SELECT s.pool_key_hash, + pk.token0, + pk.token1, + date_bin(INTERVAL '1 hour', b.time, '2000-01-01 00:00:00'::TIMESTAMP WITHOUT TIME ZONE) AS hour, + b.time, + s.tick_after, + s.liquidity_after, + LAG(b.time) OVER (PARTITION BY s.pool_key_hash, + date_bin(INTERVAL '1 hour', b.time, '2000-01-01 00:00:00'::TIMESTAMP WITHOUT TIME ZONE) + ORDER BY b.time) AS prev_time, + LAG(s.tick_after) OVER (PARTITION BY s.pool_key_hash, + date_bin(INTERVAL '1 hour', b.time, '2000-01-01 00:00:00'::TIMESTAMP WITHOUT TIME ZONE) + ORDER BY b.time) AS prev_tick, + LAG(s.liquidity_after) OVER (PARTITION BY s.pool_key_hash, + date_bin(INTERVAL '1 hour', b.time, '2000-01-01 00:00:00'::TIMESTAMP WITHOUT TIME ZONE) + ORDER BY b.time) AS prev_liquidity + FROM swaps s + JOIN event_keys ek ON s.event_id = ek.id + JOIN blocks b ON ek.block_number = b.number + JOIN pool_keys pk ON s.pool_key_hash = pk.key_hash + WHERE DATE_TRUNC('hour', b.time) >= DATE_TRUNC('hour', $1::timestamptz) + AND delta0 != 0 + AND delta1 != 0 + ), + tick_liquidity_durations AS ( + SELECT pool_key_hash, + token0, + token1, + hour, + tick_after, + liquidity_after, + CASE + WHEN prev_time IS NULL THEN + EXTRACT(EPOCH FROM (time - hour)) + ELSE + EXTRACT(EPOCH FROM (time - prev_time)) + END AS duration_seconds, + CASE + WHEN prev_tick IS NULL THEN tick_after + ELSE prev_tick + END AS effective_tick, + CASE + WHEN prev_liquidity IS NULL THEN liquidity_after + ELSE prev_liquidity + END AS effective_liquidity + FROM swap_times + UNION ALL + SELECT pool_key_hash, + token0, + token1, + hour, + tick_after, + liquidity_after, + EXTRACT(EPOCH FROM ((hour + INTERVAL '1 hour') - time)) AS duration_seconds, + tick_after AS effective_tick, + liquidity_after AS effective_liquidity + FROM ( + SELECT DISTINCT ON (pool_key_hash, hour) + pool_key_hash, token0, token1, hour, time, tick_after, liquidity_after + FROM swap_times + ORDER BY pool_key_hash, hour, time DESC + ) last_swaps + ), + pool_twaps AS ( + SELECT pool_key_hash, + token0, + token1, + hour, + SUM(effective_tick * duration_seconds * effective_liquidity) AS tick_liquidity_seconds, + SUM(duration_seconds * effective_liquidity) AS liquidity_seconds, + SUM(duration_seconds) AS total_time_seconds, + COUNT(*) / 2 AS swap_count + FROM tick_liquidity_durations + WHERE duration_seconds > 0 AND effective_liquidity > 0 + GROUP BY pool_key_hash, token0, token1, hour + ), + aggregated_twaps AS ( + SELECT token0, + token1, + hour, + SUM(tick_liquidity_seconds) / NULLIF(SUM(liquidity_seconds), 0) AS liquidity_weighted_tick, + SUM(liquidity_seconds) AS total_liquidity_seconds, + SUM(total_time_seconds) AS total_time_seconds, + SUM(swap_count) AS swap_count, + COUNT(DISTINCT pool_key_hash) AS pool_count + FROM pool_twaps + GROUP BY token0, token1, hour + ) + SELECT token0, + token1, + hour, + liquidity_weighted_tick, + total_liquidity_seconds, + total_time_seconds, + swap_count, + pool_count + FROM aggregated_twaps + WHERE liquidity_weighted_tick IS NOT NULL) + ON CONFLICT (token0, token1, hour) + DO UPDATE SET liquidity_weighted_tick = excluded.liquidity_weighted_tick, + total_liquidity_seconds = excluded.total_liquidity_seconds, + total_time_seconds = excluded.total_time_seconds, + swap_count = excluded.swap_count, + pool_count = excluded.pool_count; + `, + values: [since], + }); + await this.pg.query({ text: ` INSERT INTO hourly_tvl_delta_by_token