Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions db/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- Seed Cursor Status Trigger Function
CREATE OR REPLACE FUNCTION spec_seed_cursor_status_sub() RETURNS trigger AS $$
DECLARE
rec RECORD;
payload TEXT;
BEGIN
IF OLD.status = 'paused' AND NEW.status = 'in-progress' THEN
rec := NEW;
payload := '{"data":' || row_to_json(rec) || '}';
PERFORM pg_notify('spec_seed_cursor_status', payload);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

--=======================================================
-- SPEC SCHEMA
--=======================================================
Expand Down Expand Up @@ -269,6 +284,7 @@ alter table spec.seed_cursors owner to spec;
create trigger on_insert_seed_cursors after insert on spec.seed_cursors for each row execute function spec_table_sub('id');
create trigger on_update_seed_cursors after update on spec.seed_cursors for each row execute function spec_table_sub('id');
create trigger on_delete_seed_cursors after delete on spec.seed_cursors for each row execute function spec_table_sub('id');
create trigger on_status_change_seed_cursors after update of status on spec.seed_cursors for each row execute function spec_seed_cursor_status_sub();

-- Migrations Table
create table if not exists spec.migrations (
Expand Down
1 change: 1 addition & 0 deletions src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,5 @@ export const constants: StringKeyMap = {
ev('MATCH_CASE_INSENSITIVE_ADDRESSES')
),
RESEED_QUEUE_CHANNEL: 'spec_new_reseed_job',
SEED_CURSOR_STATUS_CHANNEL: 'spec_seed_cursor_status',
}
31 changes: 31 additions & 0 deletions src/lib/db/spec/seedCursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,34 @@ export async function failedSeedCursorsExist(): Promise<boolean> {
return false
}
}

export async function checkIfSeedCursorIsPaused(id: string) {
try {
const results = await seedCursors().select('status').where('id', id).limit(1)
const status = results[0]?.status
return status && status === 'paused' ? true : false
} catch (err) {
logger.error(`Error checking if seed_cursor (id=${id}) is paused: ${err}`)
return false
}
}

export async function pauseSeedCursor(id: string) {
try {
await db.transaction(async (tx) => {
await seedCursors(tx).update('is_paused', true).where('id', id)
})
} catch (err) {
logger.error(`Error pausing seed_cursor (id=${id}): ${err}`)
}
}

export async function resumeSeedCursor(id: string) {
try {
await db.transaction(async (tx) => {
await seedCursors(tx).update('is_paused', false).where('id', id)
})
} catch (err) {
logger.error(`Error unpausing seed_cursor (id=${id}): ${err}`)
}
}
28 changes: 24 additions & 4 deletions src/lib/db/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
ResolveRecordsSpec,
SeedCursorJobType,
SeedCursorStatus,
SeedCursor,
TriggerProcedure,
} from '../types'
import { tablesMeta, getRel } from './tablesMeta'
Expand Down Expand Up @@ -68,6 +69,7 @@ export class TableSubscriber {
})
}
reseedJobCallback: (columns: { path: string }[]) => Promise<void>
pauseResumeSeedJobCallback: (seedCursor: SeedCursor) => Promise<void>

async upsertTableSubs() {
this._upsertPgListener()
Expand Down Expand Up @@ -148,12 +150,29 @@ export class TableSubscriber {
if (subscribedChannels.includes(constants.TABLE_SUB_CHANNEL)) return

// Register event handler.
this.pgListener.notifications.on(constants.TABLE_SUB_CHANNEL, (event) =>
this.pgListener.notifications.on(constants.TABLE_SUB_CHANNEL, (event) => {
this._onTableDataChange(event)
)
})

// Register event handler.
pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => {
this.pgListener.notifications.on(constants.SEED_CURSOR_STATUS_CHANNEL, async (event) => {
// get seed cursor data
const seedCursorData = event.data
const seedCursor: SeedCursor = {
id: seedCursorData.id,
job_type: seedCursorData.job_type,
spec: seedCursorData.spec,
status: seedCursorData.status,
cursor: seedCursorData.cursor,
createdAt: seedCursorData.created_at,
}

// run the seed cursor
if (seedCursor.status === 'in-progress') {
await this.pauseResumeSeedJobCallback(seedCursor)
}
})

this.pgListener.notifications.on(constants.RESEED_QUEUE_CHANNEL, async (event) => {
const reseedJob: ReseedJob = {
id: event.data.id,
tableName: event.data.table_name,
Expand Down Expand Up @@ -188,6 +207,7 @@ export class TableSubscriber {
// Actually start listening to table data changes.
try {
await this.pgListener.connect()
await this.pgListener.listenTo(constants.SEED_CURSOR_STATUS_CHANNEL)
await this.pgListener.listenTo(constants.TABLE_SUB_CHANNEL)
await this.pgListener.listenTo(constants.RESEED_QUEUE_CHANNEL)
} catch (err) {
Expand Down
48 changes: 44 additions & 4 deletions src/lib/services/SeedTableService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ import { QueryError } from '../errors'
import { constants } from '../constants'
import { tablesMeta, getRel, isColTypeArray } from '../db/tablesMeta'
import chalk from 'chalk'
import { updateCursor, updateMetadata, upsertOpTrackingEntries } from '../db/spec'
import {
checkIfSeedCursorIsPaused,
updateCursor,
updateMetadata,
upsertOpTrackingEntries,
} from '../db/spec'
import LRU from 'lru-cache'
import { applyDefaults } from '../defaults'
import { withDeadlockProtection } from '../utils/db'
Expand All @@ -39,6 +44,11 @@ import messageClient from '../rpcs/messageClient'

const valueSep = '__:__'

const shouldContinueWithSeedCursor = async (seedCursorId: string) => {
const isSeedCursorPaused = await checkIfSeedCursorIsPaused(seedCursorId)
return !isSeedCursorPaused
}

class SeedTableService {
seedSpec: SeedSpec

Expand Down Expand Up @@ -360,7 +370,17 @@ class SeedTableService {
},
sharedErrorContext,
options,
this.metadata.fromTrigger
this.metadata.fromTrigger,
1,
async () => {
try {
const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId)
return !isPaused
} catch (err) {
console.log("we got an error checking if we're paused", err)
return false
}
}
)
} catch (err) {
logger.error(err)
Expand Down Expand Up @@ -485,7 +505,17 @@ class SeedTableService {
onFunctionRespData,
sharedErrorContext,
{},
this.metadata.fromTrigger
this.metadata.fromTrigger,
1,
async () => {
try {
const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId)
return !isPaused
} catch (err) {
logger.error("we got an error checking if we're paused", err)
return false
}
}
)
} catch (err) {
logger.error(err)
Expand Down Expand Up @@ -680,7 +710,17 @@ class SeedTableService {
onFunctionRespData,
sharedErrorContext,
{},
this.metadata.fromTrigger
this.metadata.fromTrigger,
1,
async () => {
try {
const isPaused = await checkIfSeedCursorIsPaused(this.seedCursorId)
return !isPaused
} catch (err) {
console.log("we got an error checking if we're paused", err)
return false
}
}
)
} catch (err) {
logger.error(err)
Expand Down
125 changes: 81 additions & 44 deletions src/lib/shared-tables/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,61 @@ export async function querySharedTable(
sharedErrorContext: StringKeyMap,
options: SelectOptions = {},
nearHead: boolean = false,
attempt: number = 1
attempt: number = 1,
streamShouldContinue?: () => Promise<boolean>
) {
const queryPayload: StringKeyMap = {
table: tablePath,
filters: stringifyAnyDates(payload),
options,
}
if (nearHead) {
queryPayload.nearHead = true
}

const abortController = new AbortController()
const resp = await makeRequest(tablePath, queryPayload, abortController)
// check if we should continue
const shouldContinue = streamShouldContinue ? await streamShouldContinue() : true

if (!isStreamingResp(resp)) {
await handleJSONResp(resp, tablePath, onData)
if (!shouldContinue) {
logger.info('aborting', shouldContinue)
return
}
} else {
const queryPayload: StringKeyMap = {
table: tablePath,
filters: stringifyAnyDates(payload),
options,
}
if (nearHead) {
queryPayload.nearHead = true
}

try {
await handleStreamingResp(resp, abortController, onData, sharedErrorContext)
} catch (err) {
logger.error(`Error handling streaming response from shared table ${tablePath}: ${err}`)
if (attempt < constants.EXPO_BACKOFF_MAX_ATTEMPTS) {
logger.warn(
`Retrying with attempt ${attempt}/${constants.EXPO_BACKOFF_MAX_ATTEMPTS}...`
)
await sleep(constants.EXPO_BACKOFF_FACTOR ** attempt * constants.EXPO_BACKOFF_DELAY)
await querySharedTable(
tablePath,
payload,
const abortController = new AbortController()
const resp = await makeRequest(tablePath, queryPayload, abortController)

if (!isStreamingResp(resp)) {
await handleJSONResp(resp, tablePath, onData)
return
}

try {
await handleStreamingResp(
resp,
abortController,
onData,
sharedErrorContext,
options || {},
nearHead,
attempt + 1
streamShouldContinue
)
} else {
throw err
} catch (err) {
logger.error(`Error handling streaming response from shared table ${tablePath}: ${err}`)
if (attempt < constants.EXPO_BACKOFF_MAX_ATTEMPTS) {
logger.warn(
`Retrying with attempt ${attempt}/${constants.EXPO_BACKOFF_MAX_ATTEMPTS}...`
)
await sleep(constants.EXPO_BACKOFF_FACTOR ** attempt * constants.EXPO_BACKOFF_DELAY)
await querySharedTable(
tablePath,
payload,
onData,
sharedErrorContext,
options || {},
nearHead,
attempt + 1,
streamShouldContinue
)
} else {
throw err
}
}
}
}
Expand All @@ -84,7 +100,8 @@ async function handleStreamingResp(
resp: Response,
abortController: AbortController,
onData: onDataCallbackType,
sharedErrorContext: StringKeyMap
sharedErrorContext: StringKeyMap,
streamShouldContinue?: () => Promise<boolean>
) {
// Create JSON parser for streamed response.
const jsonParser = new JSONParser({
Expand All @@ -105,20 +122,41 @@ async function handleStreamingResp(

let pendingDataPromise = null

// Parse each JSON object and add it to a batch.
let batch = []
jsonParser.onValue = (obj) => {
if (!obj) return
obj = obj as StringKeyMap
if (obj.error) throw obj.error // Throw any errors explicitly passed back
try {
// Parse each JSON object and add it to a batch.
jsonParser.onValue = (obj) => {
if (!obj) return
obj = obj as StringKeyMap
if (obj.error) throw obj.error // Throw any errors explicitly passed back

obj = camelizeKeys(obj)

batch.push(obj)

obj = camelizeKeys(obj)
const NUMBER_TO_CHECK_FOR_PAUSE = 1000
if (batch.length % NUMBER_TO_CHECK_FOR_PAUSE === 0) {
new Promise(async (resolve) => {
const shouldContinue = streamShouldContinue
? await streamShouldContinue()
: true

batch.push(obj)
if (batch.length === constants.STREAMING_SEED_UPSERT_BATCH_SIZE) {
pendingDataPromise = onData([...batch])
batch = []
if (!shouldContinue) {
abortController.abort()
resolve('aborted')
} else {
resolve('not aborted')
}
})
}

if (batch.length === constants.STREAMING_SEED_UPSERT_BATCH_SIZE) {
pendingDataPromise = onData([...batch])
batch = []
}
}
} catch (e) {
console.log('error in json parser', e)
}

let chunk
Expand All @@ -138,7 +176,6 @@ async function handleStreamingResp(
chunkTimer && clearTimeout(chunkTimer)
throw `Error iterating response stream: ${err?.message || err}`
}
chunkTimer && clearTimeout(chunkTimer)

if (batch.length) {
await onData([...batch])
Expand Down
Loading