Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion db/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- Reseed Queue Trigger Function
CREATE OR REPLACE FUNCTION spec_reseed_queue_sub() RETURNS trigger AS $$
DECLARE
rec RECORD;
payload TEXT;
BEGIN
rec := NEW;
payload := '{"data":' || row_to_json(rec) || '}';
PERFORM pg_notify('spec_new_reseed_job', payload);
RETURN rec;
END;
$$ LANGUAGE plpgsql;

--=======================================================
-- SPEC SCHEMA
--=======================================================
Expand Down Expand Up @@ -303,4 +316,16 @@ create table if not exists spec.frozen_tables (
comment on table spec.frozen_tables is 'Spec: Live tables actively ignoring new updates.';
create unique index idx_frozen_table_chain on spec.frozen_tables(table_path, chain_id);
create index idx_frozen_tables_by_chain on spec.frozen_tables(chain_id);
alter table spec.frozen_tables owner to spec;
alter table spec.frozen_tables owner to spec;

-- Reseed Queue
create table if not exists spec.reseed_queue (
id serial primary key,
table_name character varying not null,
column_names character varying not null,
status character varying not null,
created_at timestamp with time zone not null default(now() at time zone 'utc')
);
comment on table spec.reseed_queue is 'Spec: Job queue for re-seeding live tables.';
alter table spec.reseed_queue owner to spec;
create trigger on_insert_reseed_queue after insert on spec.reseed_queue for each row execute function spec_reseed_queue_sub();
1 change: 1 addition & 0 deletions src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,5 @@ export const constants: StringKeyMap = {
MATCH_CASE_INSENSITIVE_ADDRESSES: ['true', true].includes(
ev('MATCH_CASE_INSENSITIVE_ADDRESSES')
),
RESEED_QUEUE_CHANNEL: 'spec_new_reseed_job',
}
1 change: 1 addition & 0 deletions src/lib/db/spec/names.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const SPEC_SCHEMA_NAME = 'spec'
export const EVENT_CURSORS_TABLE_NAME = 'event_cursors'
export const RESEED_QUEUE_TABLE_NAME = 'reseed_queue'
export const LIVE_COLUMNS_TABLE_NAME = 'live_columns'
export const LINKS_TABLE_NAME = 'links'
export const TABLE_SUB_CURSORS_TABLE_NAME = 'table_sub_cursors'
Expand Down
43 changes: 43 additions & 0 deletions src/lib/db/spec/reseedQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { schema, db } from '..'
import { RESEED_QUEUE_TABLE_NAME, SPEC_SCHEMA_NAME } from './names'
import logger from '../../logger'
import { unique } from '../../utils/formatters'

export const reseedQueue = (tx?) => schema(SPEC_SCHEMA_NAME, tx).from(RESEED_QUEUE_TABLE_NAME)

export type ReseedJob = {
id: number
tableName: string
columnNames: string
createdAt: Date
status: ReseedStatus
}

export enum ReseedStatus {
InLine = 'in-line',
InProgress = 'in-progress',
Failed = 'failed',
}

export const updateReseedJobStatusById = async (id: number, newStatus: ReseedStatus) => {
await reseedQueue().where({ id }).update({ status: newStatus })
}

export const deleteReseedJobById = async (id: number) => {
await reseedQueue().where({ id }).delete()
}

// Just delete successful seed cursors.
export async function reseedSucceeded(ids: number | string | (string | number)[]) {
ids = Array.isArray(ids) ? ids : [ids]
if (!ids.length) return
try {
await db.transaction(async (tx) => {
await reseedQueue(tx)
.whereIn('id', unique(ids as any[]))
.del()
})
} catch (err) {
logger.error(`Error deleting reseed_queue upon success (ids=${ids.join(',')}): ${err}`)
}
}
41 changes: 41 additions & 0 deletions src/lib/db/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { db, connectionConfig } from '.'
import { getSpecTriggers, maybeDropTrigger, createTrigger } from './triggers'
import {
ReseedJob,
ReseedStatus,
reseedSucceeded,
updateReseedJobStatusById,
} from './spec/reseedQueue'
import {
TableSub,
TableSubStatus,
Expand Down Expand Up @@ -61,6 +67,7 @@ export class TableSubscriber {
logger.error(`Table Subscriber Error: ${err}`)
})
}
reseedJobCallback: (columns: { path: string }[]) => Promise<void>

async upsertTableSubs() {
this._upsertPgListener()
Expand Down Expand Up @@ -145,10 +152,44 @@ export class TableSubscriber {
this._onTableDataChange(event)
)

// Register event handler.
pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => {
const reseedJob: ReseedJob = {
id: event.data.id,
tableName: event.data.table_name,
columnNames: event.data.column_names,
createdAt: event.data.created_at,
status: event.data.status,
}

await updateReseedJobStatusById(reseedJob.id, ReseedStatus.InProgress)

const isAllColumns = reseedJob.columnNames === '*'

try {
if (isAllColumns) {
const column = {
path: `${reseedJob.tableName}.*`,
}
await this.reseedJobCallback([column])
} else {
const columns = reseedJob.columnNames.split(',').map((colName) => ({
path: `${reseedJob.tableName}.${colName}`,
}))
await this.reseedJobCallback(columns)
}
await reseedSucceeded([reseedJob.id])
} catch (error) {
console.error(error)
await updateReseedJobStatusById(reseedJob.id, ReseedStatus.Failed)
}
})

// Actually start listening to table data changes.
try {
await this.pgListener.connect()
await this.pgListener.listenTo(constants.TABLE_SUB_CHANNEL)
await this.pgListener.listenTo(constants.RESEED_QUEUE_CHANNEL)
} catch (err) {
logger.error(`Error connecting to table-subs notification channel: ${err}`)
}
Expand Down
31 changes: 28 additions & 3 deletions src/spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class Spec {
tableSubscriber.getLiveObject = (id) => this.liveObjects[id]
await tableSubscriber.upsertTableSubs()

tableSubscriber.reseedJobCallback = (columns) => this._upsertAndSeedLiveColumns(columns)

// Connect to event/rpc message client.
// Force run the onConnect handler if already connected.
messageClient.client ? messageClient.onConnect() : messageClient.connect()
Expand Down Expand Up @@ -605,21 +607,44 @@ class Spec {
})
}

async _upsertAndSeedLiveColumns() {
async _upsertAndSeedLiveColumns(columns?: { path: string }[]) {
if (columns) {
this.hasCalledUpsertAndSeedLiveColumns = false
}
let liveColumnsToSeed = []

// Detect any changes with live columns or links (filterBy, uniqueBy, etc.).
const isAllColumns =
columns && columns.length === 1 && columns[0].path.split('.')[2] === '*'

// Upsert any new/changed live columns listed in the config.
// We will seed (or re-seed) all live columns that were upserted.
const upsertLiveColumnService = new UpsertLiveColumnsService()
try {
await upsertLiveColumnService.perform()
liveColumnsToSeed = upsertLiveColumnService.liveColumnsToUpsert

if (columns) {
liveColumnsToSeed.push(
...upsertLiveColumnService.prevLiveColumns.filter((c) => {
const result = isAllColumns
? true
: columns.some((col) => {
return col.path === c.columnPath
})
return result
})
)
// remove duplicates from liveColumnsToSeed
liveColumnsToSeed = liveColumnsToSeed.filter((v, i, a) => a.indexOf(v) === i)
}
} catch (err) {
logger.error(`Failed to upsert live columns: ${err}`)
liveColumnsToSeed = []
}

const tablePathsUsingLiveObjectId = upsertLiveColumnService.tablePathsUsingLiveObjectId
const newLiveTablePaths = upsertLiveColumnService.newLiveTablePaths
const tablePathsUsingLiveObjectId: { [key: string]: Set<string> } = {}
const newLiveTablePaths = new Set<string>()

// Get a map of unique live-object/table relations (grouping the column names).
const uniqueLiveObjectTablePaths = {}
Expand Down