diff --git a/db/init.sql b/db/init.sql index fc52ff8..6055d11 100644 --- a/db/init.sql +++ b/db/init.sql @@ -197,6 +197,19 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- Reseed Queue Trigger Function +CREATE OR REPLACE FUNCTION spec_reseed_queue_sub() RETURNS trigger AS $$ +DECLARE + rec RECORD; + payload TEXT; +BEGIN + rec := NEW; + payload := '{"data":' || row_to_json(rec) || '}'; + PERFORM pg_notify('spec_new_reseed_job', payload); + RETURN rec; +END; +$$ LANGUAGE plpgsql; + --======================================================= -- SPEC SCHEMA --======================================================= @@ -303,4 +316,16 @@ create table if not exists spec.frozen_tables ( comment on table spec.frozen_tables is 'Spec: Live tables actively ignoring new updates.'; create unique index idx_frozen_table_chain on spec.frozen_tables(table_path, chain_id); create index idx_frozen_tables_by_chain on spec.frozen_tables(chain_id); -alter table spec.frozen_tables owner to spec; \ No newline at end of file +alter table spec.frozen_tables owner to spec; + +-- Reseed Queue +create table if not exists spec.reseed_queue ( + id serial primary key, + table_name character varying not null, + column_names character varying not null, + status character varying not null, + created_at timestamp with time zone not null default(now() at time zone 'utc') +); +comment on table spec.reseed_queue is 'Spec: Job queue for re-seeding live tables.'; +alter table spec.reseed_queue owner to spec; +create trigger on_insert_reseed_queue after insert on spec.reseed_queue for each row execute function spec_reseed_queue_sub(); diff --git a/src/lib/constants.ts b/src/lib/constants.ts index d4e7d43..05ec7d0 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -132,4 +132,5 @@ export const constants: StringKeyMap = { MATCH_CASE_INSENSITIVE_ADDRESSES: ['true', true].includes( ev('MATCH_CASE_INSENSITIVE_ADDRESSES') ), + RESEED_QUEUE_CHANNEL: 'spec_new_reseed_job', } diff --git a/src/lib/db/spec/names.ts b/src/lib/db/spec/names.ts index 4b9c1bc..649a97e 100644 --- a/src/lib/db/spec/names.ts +++ b/src/lib/db/spec/names.ts @@ -1,5 +1,6 @@ export const SPEC_SCHEMA_NAME = 'spec' export const EVENT_CURSORS_TABLE_NAME = 'event_cursors' +export const RESEED_QUEUE_TABLE_NAME = 'reseed_queue' export const LIVE_COLUMNS_TABLE_NAME = 'live_columns' export const LINKS_TABLE_NAME = 'links' export const TABLE_SUB_CURSORS_TABLE_NAME = 'table_sub_cursors' diff --git a/src/lib/db/spec/reseedQueue.ts b/src/lib/db/spec/reseedQueue.ts new file mode 100644 index 0000000..2624273 --- /dev/null +++ b/src/lib/db/spec/reseedQueue.ts @@ -0,0 +1,43 @@ +import { schema, db } from '..' +import { RESEED_QUEUE_TABLE_NAME, SPEC_SCHEMA_NAME } from './names' +import logger from '../../logger' +import { unique } from '../../utils/formatters' + +export const reseedQueue = (tx?) => schema(SPEC_SCHEMA_NAME, tx).from(RESEED_QUEUE_TABLE_NAME) + +export type ReseedJob = { + id: number + tableName: string + columnNames: string + createdAt: Date + status: ReseedStatus +} + +export enum ReseedStatus { + InLine = 'in-line', + InProgress = 'in-progress', + Failed = 'failed', +} + +export const updateReseedJobStatusById = async (id: number, newStatus: ReseedStatus) => { + await reseedQueue().where({ id }).update({ status: newStatus }) +} + +export const deleteReseedJobById = async (id: number) => { + await reseedQueue().where({ id }).delete() +} + +// Just delete successful seed cursors. +export async function reseedSucceeded(ids: number | string | (string | number)[]) { + ids = Array.isArray(ids) ? ids : [ids] + if (!ids.length) return + try { + await db.transaction(async (tx) => { + await reseedQueue(tx) + .whereIn('id', unique(ids as any[])) + .del() + }) + } catch (err) { + logger.error(`Error deleting reseed_queue upon success (ids=${ids.join(',')}): ${err}`) + } +} diff --git a/src/lib/db/subscriber.ts b/src/lib/db/subscriber.ts index 7cd5204..4e303cd 100644 --- a/src/lib/db/subscriber.ts +++ b/src/lib/db/subscriber.ts @@ -1,5 +1,11 @@ import { db, connectionConfig } from '.' import { getSpecTriggers, maybeDropTrigger, createTrigger } from './triggers' +import { + ReseedJob, + ReseedStatus, + reseedSucceeded, + updateReseedJobStatusById, +} from './spec/reseedQueue' import { TableSub, TableSubStatus, @@ -61,6 +67,7 @@ export class TableSubscriber { logger.error(`Table Subscriber Error: ${err}`) }) } + reseedJobCallback: (columns: { path: string }[]) => Promise async upsertTableSubs() { this._upsertPgListener() @@ -145,10 +152,44 @@ export class TableSubscriber { this._onTableDataChange(event) ) + // Register event handler. + pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => { + const reseedJob: ReseedJob = { + id: event.data.id, + tableName: event.data.table_name, + columnNames: event.data.column_names, + createdAt: event.data.created_at, + status: event.data.status, + } + + await updateReseedJobStatusById(reseedJob.id, ReseedStatus.InProgress) + + const isAllColumns = reseedJob.columnNames === '*' + + try { + if (isAllColumns) { + const column = { + path: `${reseedJob.tableName}.*`, + } + await this.reseedJobCallback([column]) + } else { + const columns = reseedJob.columnNames.split(',').map((colName) => ({ + path: `${reseedJob.tableName}.${colName}`, + })) + await this.reseedJobCallback(columns) + } + await reseedSucceeded([reseedJob.id]) + } catch (error) { + console.error(error) + await updateReseedJobStatusById(reseedJob.id, ReseedStatus.Failed) + } + }) + // Actually start listening to table data changes. try { await this.pgListener.connect() await this.pgListener.listenTo(constants.TABLE_SUB_CHANNEL) + await this.pgListener.listenTo(constants.RESEED_QUEUE_CHANNEL) } catch (err) { logger.error(`Error connecting to table-subs notification channel: ${err}`) } diff --git a/src/spec.ts b/src/spec.ts index 82cd974..0524447 100644 --- a/src/spec.ts +++ b/src/spec.ts @@ -131,6 +131,8 @@ class Spec { tableSubscriber.getLiveObject = (id) => this.liveObjects[id] await tableSubscriber.upsertTableSubs() + tableSubscriber.reseedJobCallback = (columns) => this._upsertAndSeedLiveColumns(columns) + // Connect to event/rpc message client. // Force run the onConnect handler if already connected. messageClient.client ? messageClient.onConnect() : messageClient.connect() @@ -605,21 +607,44 @@ class Spec { }) } - async _upsertAndSeedLiveColumns() { + async _upsertAndSeedLiveColumns(columns?: { path: string }[]) { + if (columns) { + this.hasCalledUpsertAndSeedLiveColumns = false + } let liveColumnsToSeed = [] // Detect any changes with live columns or links (filterBy, uniqueBy, etc.). + const isAllColumns = + columns && columns.length === 1 && columns[0].path.split('.')[2] === '*' + + // Upsert any new/changed live columns listed in the config. + // We will seed (or re-seed) all live columns that were upserted. const upsertLiveColumnService = new UpsertLiveColumnsService() try { await upsertLiveColumnService.perform() liveColumnsToSeed = upsertLiveColumnService.liveColumnsToUpsert + + if (columns) { + liveColumnsToSeed.push( + ...upsertLiveColumnService.prevLiveColumns.filter((c) => { + const result = isAllColumns + ? true + : columns.some((col) => { + return col.path === c.columnPath + }) + return result + }) + ) + // remove duplicates from liveColumnsToSeed + liveColumnsToSeed = liveColumnsToSeed.filter((v, i, a) => a.indexOf(v) === i) + } } catch (err) { logger.error(`Failed to upsert live columns: ${err}`) liveColumnsToSeed = [] } - const tablePathsUsingLiveObjectId = upsertLiveColumnService.tablePathsUsingLiveObjectId - const newLiveTablePaths = upsertLiveColumnService.newLiveTablePaths + const tablePathsUsingLiveObjectId: { [key: string]: Set } = {} + const newLiveTablePaths = new Set() // Get a map of unique live-object/table relations (grouping the column names). const uniqueLiveObjectTablePaths = {}