Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules/
dist/
.env
.agents/
21 changes: 21 additions & 0 deletions apps/api/knexfile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { Knex } from 'knex';

const config: { [key: string]: Knex.Config } = {
development: {
client: 'pg',
connection: process.env.DATABASE_URL || 'postgresql://postgres:password@localhost:5432/watchtower',
migrations: {
directory: './migrations',
loadExtensions: ['.ts', '.js'],
},
},
production: {
client: 'pg',
connection: process.env.DATABASE_URL,
migrations: {
directory: './migrations',
},
},
};

export default config;
57 changes: 57 additions & 0 deletions apps/api/migrations/20240416110000_create_auth_tables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
// users table
await knex.schema.createTable('users', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.string('email').unique().notNullable();
table.string('password_hash').nullable(); // Nullable for OAuth-only users
table.string('github_id').unique().nullable();
table.string('display_name').notNullable();
table.timestamps(true, true); // Adds created_at and updated_at

table.index('email');
table.index('github_id');
});

// sessions table
await knex.schema.createTable('sessions', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table
.uuid('user_id')
.notNullable()
.references('id')
.inTable('users')
.onDelete('CASCADE');
table.string('token_hash').notNullable();
table.timestamp('expires_at').notNullable();
table.timestamp('created_at').defaultTo(knex.fn.now());

table.index('token_hash');
});

// api_keys table
await knex.schema.createTable('api_keys', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table
.uuid('user_id')
.notNullable()
.references('id')
.inTable('users')
.onDelete('CASCADE');
table.string('key_hash').notNullable();
table.string('label').notNullable();
table.timestamp('last_used_at').nullable();
table.timestamp('created_at').defaultTo(knex.fn.now());
table.timestamp('revoked_at').nullable();

table.index('key_hash');
});
}

export async function down(knex: Knex): Promise<void> {
// Drop in reverse order of foreign keys
await knex.schema.dropTableIfExists('api_keys');
await knex.schema.dropTableIfExists('sessions');
await knex.schema.dropTableIfExists('users');
}
62 changes: 62 additions & 0 deletions apps/api/migrations/20240416120000_create_contract_tables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
// Create native ENUM types for PG
await knex.raw("CREATE TYPE account_tier AS ENUM ('free', 'pro', 'team', 'enterprise')");
await knex.raw("CREATE TYPE network_type AS ENUM ('mainnet', 'testnet')");

// accounts table
await knex.schema.createTable('accounts', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table
.uuid('user_id')
.unique()
.notNullable()
.references('id')
.inTable('users')
.onDelete('CASCADE');
table
.specificType('tier', 'account_tier')
.notNullable()
.defaultTo('free');
table.integer('max_contracts').notNullable().defaultTo(2);
table.integer('retention_days').notNullable().defaultTo(7);
table.timestamps(true, true);
});

// contracts table
await knex.schema.createTable('contracts', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table
.uuid('account_id')
.notNullable()
.references('id')
.inTable('accounts')
.onDelete('CASCADE');
table.string('contract_address', 56).notNullable();
table.specificType('network', 'network_type').notNullable();
table.string('label', 100).nullable();
table.specificType('tags', 'text[]').nullable();
table.string('deployer_address', 56).nullable();
table.string('wasm_hash', 64).nullable();
table.integer('creation_ledger').nullable();
table.boolean('is_active').notNullable().defaultTo(true);
table.timestamps(true, true);

// Unique constraint on account_id + address + network
table.unique(['account_id', 'contract_address', 'network']);

// Index for fast address lookups
table.index('contract_address');
});
}

export async function down(knex: Knex): Promise<void> {
// Drop in reverse order
await knex.schema.dropTableIfExists('contracts');
await knex.schema.dropTableIfExists('accounts');

// Clean up types
await knex.raw('DROP TYPE IF EXISTS network_type');
await knex.raw('DROP TYPE IF EXISTS account_tier');
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
// 1. Create the base table
await knex.schema.createTable('invocation_events', (table) => {
table.bigIncrements('id').notNullable();
table.timestamp('time').notNullable();
table
.uuid('contract_id')
.notNullable()
.references('id')
.inTable('contracts')
.onDelete('CASCADE');
table.integer('ledger_sequence').notNullable();
table.string('tx_hash', 64).notNullable();
table.string('function_name', 256).nullable();
table.string('invoker_address', 56).nullable();
table.boolean('success').notNullable();

// Error details
table.string('error_type', 64).nullable();
table.string('error_code', 64).nullable();
table.text('error_raw').nullable();

// Resource metrics
table.bigInteger('cpu_instructions').nullable();
table.bigInteger('memory_bytes').nullable();
table.integer('ledger_reads').nullable();
table.integer('ledger_writes').nullable();
table.bigInteger('read_bytes').nullable();
table.bigInteger('write_bytes').nullable();
table.integer('tx_size_bytes').nullable();
table.integer('events_emitted').nullable();
table.bigInteger('fee_charged').nullable();

// Cursor for pagination/tracking
table.string('event_cursor', 128).notNullable();
});

// 2. Convert to Hypertable (TimescaleDB specific)
// Partitioned by 'time' with 1-day chunks
await knex.raw("SELECT create_hypertable('invocation_events', 'time', chunk_time_interval => INTERVAL '1 day')");

// 3. Create optimized indexes
// We use knex.raw to ensure correct DESC order on the time column
await knex.raw('CREATE INDEX idx_invocation_contract_time ON invocation_events (contract_id, time DESC)');
await knex.raw('CREATE INDEX idx_invocation_contract_func_time ON invocation_events (contract_id, function_name, time DESC)');
await knex.raw('CREATE INDEX idx_invocation_tx_hash ON invocation_events (tx_hash)');

// 4. Configure default retention policy
// Automatically drop chunks older than 7 days
await knex.raw("SELECT add_retention_policy('invocation_events', INTERVAL '7 days')");
}

export async function down(knex: Knex): Promise<void> {
// Note: In TimescaleDB, dropping the table also cleans up chunks and policies.
await knex.schema.dropTableIfExists('invocation_events');
}
29 changes: 29 additions & 0 deletions apps/api/migrations/20240416140000_create_cursor_checkpoints.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable('cursor_checkpoints', (table) => {
table.increments('id').primary();
table
.uuid('contract_id')
.notNullable()
.references('id')
.inTable('contracts')
.onDelete('CASCADE');

// Use the network_type enum created in 20240416120000_create_contract_tables.ts
table.specificType('network', 'network_type').notNullable();

table.string('last_cursor', 128).notNullable();
table.integer('last_ledger_sequence').notNullable();
table.timestamp('last_processed_at').notNullable();
table.boolean('gap_detected').defaultTo(false);
table.timestamps(true, true);

// Unique constraint to enable easy upserts for the Poller
table.unique(['contract_id', 'network']);
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTableIfExists('cursor_checkpoints');
}
75 changes: 75 additions & 0 deletions apps/api/migrations/20240416150000_create_continuous_aggregates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
// 1. Ensure TimescaleDB Toolkit is enabled (optional but recommended for percentile_agg)
// Note: This might fail if the shared library is not in the image,
// but most modern Timescale images include it.
await knex.raw('CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit CASCADE').catch(() => {
console.warn('TimescaleDB Toolkit extension not found. Percentile approximations might not work.');
});

// 2. Create 1-minute rollup
await knex.raw(`
CREATE MATERIALIZED VIEW invocations_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', "time") AS bucket,
contract_id,
function_name,
count(*) AS total_count,
count(*) FILTER (WHERE success = true) AS success_count,
count(*) FILTER (WHERE success = false) AS error_count,
avg(cpu_instructions) AS avg_cpu,
avg(memory_bytes) AS avg_memory,
stats_agg(cpu_instructions) AS cpu_stats, -- For mean, variance, etc.
percentile_agg(cpu_instructions) AS cpu_percentiles -- For P95
FROM invocation_events
GROUP BY bucket, contract_id, function_name;
`);

// 3. Create 1-hour rollup
await knex.raw(`
CREATE MATERIALIZED VIEW invocations_1h
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', "time") AS bucket,
contract_id,
function_name,
count(*) AS total_count,
count(*) FILTER (WHERE success = true) AS success_count,
count(*) FILTER (WHERE success = false) AS error_count,
avg(cpu_instructions) AS avg_cpu,
avg(memory_bytes) AS avg_memory,
stats_agg(cpu_instructions) AS cpu_stats,
percentile_agg(cpu_instructions) AS cpu_percentiles
FROM invocation_events
GROUP BY bucket, contract_id, function_name;
`);

// 4. Set Refresh Policies
// 1m aggregate refreshes every 1 minute with a 5-minute lag window
await knex.raw(`
SELECT add_continuous_aggregate_policy('invocations_1m',
start_offset => INTERVAL '5 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
`);

// 1h aggregate refreshes every 10 minutes with a 1-hour lag window
await knex.raw(`
SELECT add_continuous_aggregate_policy('invocations_1h',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '10 minutes');
`);
}

export async function down(knex: Knex): Promise<void> {
// Drop policies first
await knex.raw("SELECT remove_continuous_aggregate_policy('invocations_1h')").catch(() => {});
await knex.raw("SELECT remove_continuous_aggregate_policy('invocations_1m')").catch(() => {});

// Drop materialized views
await knex.raw('DROP MATERIALIZED VIEW IF EXISTS invocations_1h');
await knex.raw('DROP MATERIALIZED VIEW IF EXISTS invocations_1m');
}
7 changes: 6 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"lint": "eslint src --ext .ts",
"test": "vitest run unit",
"test:integration": "vitest run integration",
"type-check": "tsc --noEmit"
"type-check": "tsc --noEmit",
"migrate:latest": "knex migrate:latest --knexfile knexfile.ts",
"migrate:rollback": "knex migrate:rollback --knexfile knexfile.ts",
"migrate:make": "knex migrate:make --knexfile knexfile.ts"
},
"dependencies": {
"@fastify/cors": "^9.0.0",
Expand All @@ -22,6 +25,8 @@
"@fastify/websocket": "^9.0.0",
"@watchtower/shared": "*",
"fastify": "^4.24.0",
"knex": "^3.2.9",
"pg": "^8.20.0",
"zod": "^3.22.0"
},
"devDependencies": {
Expand Down
Loading
Loading