From ca8831ba98b5fab33405fe04b64a75770dc7ad2d Mon Sep 17 00:00:00 2001 From: Murtaza Saadat Date: Mon, 16 Oct 2023 12:52:22 -0400 Subject: [PATCH] pause resume logic --- db/init.sql | 16 ++++ src/lib/constants.ts | 1 + src/lib/db/spec/seedCursors.ts | 31 +++++++ src/lib/db/subscriber.ts | 28 +++++- src/lib/services/SeedTableService.ts | 48 +++++++++- src/lib/shared-tables/client.ts | 125 +++++++++++++++++---------- src/spec.ts | 8 +- 7 files changed, 203 insertions(+), 54 deletions(-) diff --git a/db/init.sql b/db/init.sql index 6055d11..b276d32 100644 --- a/db/init.sql +++ b/db/init.sql @@ -210,6 +210,21 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- Seed Cursor Status Trigger Function +CREATE OR REPLACE FUNCTION spec_seed_cursor_status_sub() RETURNS trigger AS $$ +DECLARE + rec RECORD; + payload TEXT; +BEGIN + IF OLD.status = 'paused' AND NEW.status = 'in-progress' THEN + rec := NEW; + payload := '{"data":' || row_to_json(rec) || '}'; + PERFORM pg_notify('spec_seed_cursor_status', payload); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + --======================================================= -- SPEC SCHEMA --======================================================= @@ -269,6 +284,7 @@ alter table spec.seed_cursors owner to spec; create trigger on_insert_seed_cursors after insert on spec.seed_cursors for each row execute function spec_table_sub('id'); create trigger on_update_seed_cursors after update on spec.seed_cursors for each row execute function spec_table_sub('id'); create trigger on_delete_seed_cursors after delete on spec.seed_cursors for each row execute function spec_table_sub('id'); +create trigger on_status_change_seed_cursors after update of status on spec.seed_cursors for each row execute function spec_seed_cursor_status_sub(); -- Migrations Table create table if not exists spec.migrations ( diff --git a/src/lib/constants.ts b/src/lib/constants.ts index 05ec7d0..2caa480 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -133,4 +133,5 @@ export const constants: StringKeyMap = { ev('MATCH_CASE_INSENSITIVE_ADDRESSES') ), RESEED_QUEUE_CHANNEL: 'spec_new_reseed_job', + SEED_CURSOR_STATUS_CHANNEL: 'spec_seed_cursor_status', } diff --git a/src/lib/db/spec/seedCursors.ts b/src/lib/db/spec/seedCursors.ts index 74a2882..3c512ef 100644 --- a/src/lib/db/spec/seedCursors.ts +++ b/src/lib/db/spec/seedCursors.ts @@ -210,3 +210,34 @@ export async function failedSeedCursorsExist(): Promise { return false } } + +export async function checkIfSeedCursorIsPaused(id: string) { + try { + const results = await seedCursors().select('status').where('id', id).limit(1) + const status = results[0]?.status + return status && status === 'paused' ? true : false + } catch (err) { + logger.error(`Error checking if seed_cursor (id=${id}) is paused: ${err}`) + return false + } +} + +export async function pauseSeedCursor(id: string) { + try { + await db.transaction(async (tx) => { + await seedCursors(tx).update('is_paused', true).where('id', id) + }) + } catch (err) { + logger.error(`Error pausing seed_cursor (id=${id}): ${err}`) + } +} + +export async function resumeSeedCursor(id: string) { + try { + await db.transaction(async (tx) => { + await seedCursors(tx).update('is_paused', false).where('id', id) + }) + } catch (err) { + logger.error(`Error unpausing seed_cursor (id=${id}): ${err}`) + } +} diff --git a/src/lib/db/subscriber.ts b/src/lib/db/subscriber.ts index 4e303cd..de96121 100644 --- a/src/lib/db/subscriber.ts +++ b/src/lib/db/subscriber.ts @@ -19,6 +19,7 @@ import { ResolveRecordsSpec, SeedCursorJobType, SeedCursorStatus, + SeedCursor, TriggerProcedure, } from '../types' import { tablesMeta, getRel } from './tablesMeta' @@ -68,6 +69,7 @@ export class TableSubscriber { }) } reseedJobCallback: (columns: { path: string }[]) => Promise + pauseResumeSeedJobCallback: (seedCursor: SeedCursor) => Promise async upsertTableSubs() { this._upsertPgListener() @@ -148,12 +150,29 @@ export class TableSubscriber { if (subscribedChannels.includes(constants.TABLE_SUB_CHANNEL)) return // Register event handler. - this.pgListener.notifications.on(constants.TABLE_SUB_CHANNEL, (event) => + this.pgListener.notifications.on(constants.TABLE_SUB_CHANNEL, (event) => { this._onTableDataChange(event) - ) + }) - // Register event handler. - pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => { + this.pgListener.notifications.on(constants.SEED_CURSOR_STATUS_CHANNEL, async (event) => { + // get seed cursor data + const seedCursorData = event.data + const seedCursor: SeedCursor = { + id: seedCursorData.id, + job_type: seedCursorData.job_type, + spec: seedCursorData.spec, + status: seedCursorData.status, + cursor: seedCursorData.cursor, + createdAt: seedCursorData.created_at, + } + + // run the seed cursor + if (seedCursor.status === 'in-progress') { + await this.pauseResumeSeedJobCallback(seedCursor) + } + }) + + this.pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => { const reseedJob: ReseedJob = { id: event.data.id, tableName: event.data.table_name, @@ -188,6 +207,7 @@ export class TableSubscriber { // Actually start listening to table data changes. try { await this.pgListener.connect() + await this.pgListener.listenTo(constants.SEED_CURSOR_STATUS_CHANNEL) await this.pgListener.listenTo(constants.TABLE_SUB_CHANNEL) await this.pgListener.listenTo(constants.RESEED_QUEUE_CHANNEL) } catch (err) { diff --git a/src/lib/services/SeedTableService.ts b/src/lib/services/SeedTableService.ts index 5a97868..2f01315 100644 --- a/src/lib/services/SeedTableService.ts +++ b/src/lib/services/SeedTableService.ts @@ -30,7 +30,12 @@ import { QueryError } from '../errors' import { constants } from '../constants' import { tablesMeta, getRel, isColTypeArray } from '../db/tablesMeta' import chalk from 'chalk' -import { updateCursor, updateMetadata, upsertOpTrackingEntries } from '../db/spec' +import { + checkIfSeedCursorIsPaused, + updateCursor, + updateMetadata, + upsertOpTrackingEntries, +} from '../db/spec' import LRU from 'lru-cache' import { applyDefaults } from '../defaults' import { withDeadlockProtection } from '../utils/db' @@ -39,6 +44,11 @@ import messageClient from '../rpcs/messageClient' const valueSep = '__:__' +const shouldContinueWithSeedCursor = async (seedCursorId: string) => { + const isSeedCursorPaused = await checkIfSeedCursorIsPaused(seedCursorId) + return !isSeedCursorPaused +} + class SeedTableService { seedSpec: SeedSpec @@ -360,7 +370,17 @@ class SeedTableService { }, sharedErrorContext, options, - this.metadata.fromTrigger + this.metadata.fromTrigger, + 1, + async () => { + try { + const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId) + return !isPaused + } catch (err) { + console.log("we got an error checking if we're paused", err) + return false + } + } ) } catch (err) { logger.error(err) @@ -485,7 +505,17 @@ class SeedTableService { onFunctionRespData, sharedErrorContext, {}, - this.metadata.fromTrigger + this.metadata.fromTrigger, + 1, + async () => { + try { + const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId) + return !isPaused + } catch (err) { + logger.error("we got an error checking if we're paused", err) + return false + } + } ) } catch (err) { logger.error(err) @@ -680,7 +710,17 @@ class SeedTableService { onFunctionRespData, sharedErrorContext, {}, - this.metadata.fromTrigger + this.metadata.fromTrigger, + 1, + async () => { + try { + const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId) + return !isPaused + } catch (err) { + console.log("we got an error checking if we're paused", err) + return false + } + } ) } catch (err) { logger.error(err) diff --git a/src/lib/shared-tables/client.ts b/src/lib/shared-tables/client.ts index 0be1fe3..8c48f26 100644 --- a/src/lib/shared-tables/client.ts +++ b/src/lib/shared-tables/client.ts @@ -25,45 +25,61 @@ export async function querySharedTable( sharedErrorContext: StringKeyMap, options: SelectOptions = {}, nearHead: boolean = false, - attempt: number = 1 + attempt: number = 1, + streamShouldContinue?: () => Promise ) { - const queryPayload: StringKeyMap = { - table: tablePath, - filters: stringifyAnyDates(payload), - options, - } - if (nearHead) { - queryPayload.nearHead = true - } - - const abortController = new AbortController() - const resp = await makeRequest(tablePath, queryPayload, abortController) + // check if we should continue + const shouldContinue = streamShouldContinue ? await streamShouldContinue() : true - if (!isStreamingResp(resp)) { - await handleJSONResp(resp, tablePath, onData) + if (!shouldContinue) { + logger.info('aborting', shouldContinue) return - } + } else { + const queryPayload: StringKeyMap = { + table: tablePath, + filters: stringifyAnyDates(payload), + options, + } + if (nearHead) { + queryPayload.nearHead = true + } - try { - await handleStreamingResp(resp, abortController, onData, sharedErrorContext) - } catch (err) { - logger.error(`Error handling streaming response from shared table ${tablePath}: ${err}`) - if (attempt < constants.EXPO_BACKOFF_MAX_ATTEMPTS) { - logger.warn( - `Retrying with attempt ${attempt}/${constants.EXPO_BACKOFF_MAX_ATTEMPTS}...` - ) - await sleep(constants.EXPO_BACKOFF_FACTOR ** attempt * constants.EXPO_BACKOFF_DELAY) - await querySharedTable( - tablePath, - payload, + const abortController = new AbortController() + const resp = await makeRequest(tablePath, queryPayload, abortController) + + if (!isStreamingResp(resp)) { + await handleJSONResp(resp, tablePath, onData) + return + } + + try { + await handleStreamingResp( + resp, + abortController, onData, sharedErrorContext, - options || {}, - nearHead, - attempt + 1 + streamShouldContinue ) - } else { - throw err + } catch (err) { + logger.error(`Error handling streaming response from shared table ${tablePath}: ${err}`) + if (attempt < constants.EXPO_BACKOFF_MAX_ATTEMPTS) { + logger.warn( + `Retrying with attempt ${attempt}/${constants.EXPO_BACKOFF_MAX_ATTEMPTS}...` + ) + await sleep(constants.EXPO_BACKOFF_FACTOR ** attempt * constants.EXPO_BACKOFF_DELAY) + await querySharedTable( + tablePath, + payload, + onData, + sharedErrorContext, + options || {}, + nearHead, + attempt + 1, + streamShouldContinue + ) + } else { + throw err + } } } } @@ -84,7 +100,8 @@ async function handleStreamingResp( resp: Response, abortController: AbortController, onData: onDataCallbackType, - sharedErrorContext: StringKeyMap + sharedErrorContext: StringKeyMap, + streamShouldContinue?: () => Promise ) { // Create JSON parser for streamed response. const jsonParser = new JSONParser({ @@ -105,20 +122,41 @@ async function handleStreamingResp( let pendingDataPromise = null - // Parse each JSON object and add it to a batch. let batch = [] - jsonParser.onValue = (obj) => { - if (!obj) return - obj = obj as StringKeyMap - if (obj.error) throw obj.error // Throw any errors explicitly passed back + try { + // Parse each JSON object and add it to a batch. + jsonParser.onValue = (obj) => { + if (!obj) return + obj = obj as StringKeyMap + if (obj.error) throw obj.error // Throw any errors explicitly passed back + + obj = camelizeKeys(obj) + + batch.push(obj) - obj = camelizeKeys(obj) + const NUMBER_TO_CHECK_FOR_PAUSE = 1000 + if (batch.length % NUMBER_TO_CHECK_FOR_PAUSE === 0) { + new Promise(async (resolve) => { + const shouldContinue = streamShouldContinue + ? await streamShouldContinue() + : true - batch.push(obj) - if (batch.length === constants.STREAMING_SEED_UPSERT_BATCH_SIZE) { - pendingDataPromise = onData([...batch]) - batch = [] + if (!shouldContinue) { + abortController.abort() + resolve('aborted') + } else { + resolve('not aborted') + } + }) + } + + if (batch.length === constants.STREAMING_SEED_UPSERT_BATCH_SIZE) { + pendingDataPromise = onData([...batch]) + batch = [] + } } + } catch (e) { + console.log('error in json parser', e) } let chunk @@ -138,7 +176,6 @@ async function handleStreamingResp( chunkTimer && clearTimeout(chunkTimer) throw `Error iterating response stream: ${err?.message || err}` } - chunkTimer && clearTimeout(chunkTimer) if (batch.length) { await onData([...batch]) diff --git a/src/spec.ts b/src/spec.ts index 0524447..0ea4e54 100644 --- a/src/spec.ts +++ b/src/spec.ts @@ -32,6 +32,7 @@ import { upsertOpTrackingEntries, freezeTablesForChainId, deleteOpsOlderThan, + checkIfSeedCursorIsPaused, } from './lib/db/spec' import { SpecEvent } from '@spec.dev/event-client' import LRU from 'lru-cache' @@ -132,6 +133,7 @@ class Spec { await tableSubscriber.upsertTableSubs() tableSubscriber.reseedJobCallback = (columns) => this._upsertAndSeedLiveColumns(columns) + tableSubscriber.pauseResumeSeedJobCallback = () => this._onMessageClientConnected() // Connect to event/rpc message client. // Force run the onConnect handler if already connected. @@ -715,7 +717,6 @@ class Spec { deleteSeedCursorIds.push(seedCursor.id) continue } - if (seedCursor.job_type === SeedCursorJobType.SeedTable) { // Check to see if a seed spec for this liveObjectId+tablePath // is already scheduled to run (per above). @@ -1039,7 +1040,10 @@ class Spec { } } - await seedSucceeded(seedTableService.seedCursorId) + // if the seed is paused, then we don't want to register it as successful + const isPaused = await checkIfSeedCursorIsPaused(seedTableService.seedCursorId) + + !isPaused && (await seedSucceeded(seedTableService.seedCursorId)) // Run next seed cursor in series if one is registered. metadata?.nextId && this._runNextSeedCursorInSeries(metadata.nextId)