diff --git a/.changeset/tame-islands-deliver.md b/.changeset/tame-islands-deliver.md new file mode 100644 index 000000000..3846e4d73 --- /dev/null +++ b/.changeset/tame-islands-deliver.md @@ -0,0 +1,8 @@ +--- +'@powersync/common': minor +'@powersync/node': minor +'@powersync/react-native': minor +'@powersync/web': minor +--- + +Added SQLite trigger based table change tracking. diff --git a/docs/.gitignore b/docs/.gitignore index c32a3c8c3..91a9f9eba 100644 --- a/docs/.gitignore +++ b/docs/.gitignore @@ -25,5 +25,6 @@ docs/react-sdk/ docs/vue-sdk/ docs/web-sdk/ docs/tanstack-react-query-sdk +docs/node-sdk .env diff --git a/docs/docusaurus.config.ts b/docs/docusaurus.config.ts index 8e2de3fc0..64a0446eb 100644 --- a/docs/docusaurus.config.ts +++ b/docs/docusaurus.config.ts @@ -1,10 +1,10 @@ -import { themes as prismThemes } from 'prism-react-renderer'; -import type { TypeDocOptionMap } from 'typedoc'; -import type { Config } from '@docusaurus/types'; import type * as Preset from '@docusaurus/preset-classic'; +import type { Config } from '@docusaurus/types'; import type { PluginOptions } from 'docusaurus-plugin-typedoc'; -import { DOC_FOLDER, packageMap } from './utils/packageMap'; import 'dotenv/config'; +import { themes as prismThemes } from 'prism-react-renderer'; +import type { TypeDocOptionMap } from 'typedoc'; +import { DOC_FOLDER, packageMap } from './utils/packageMap'; const PROJECT_NAME = process.env.GH_PROJECT_NAME; @@ -155,8 +155,8 @@ const config: Config = { darkTheme: prismThemes.dracula }, future: { - experimental_faster: true, - }, + experimental_faster: true + } } satisfies Preset.ThemeConfig }; diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index f5b0397da..9b20b5da9 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -35,6 +35,8 @@ import { type PowerSyncConnectionOptions, type RequiredAdditionalConnectionOptions } from './sync/stream/AbstractStreamingSyncImplementation.js'; +import { TriggerManager } from './triggers/TriggerManager.js'; +import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js'; import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js'; import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js'; import { WatchedQueryComparator } from './watched/processors/comparators.js'; @@ -191,6 +193,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { @@ -252,7 +261,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver('SELECT * FROM DIFF'); + * diff.forEach(diff => console.log(diff.operation, diff.timestamp, JSON.parse(diff.value))) + * ``` + */ +export type TriggerDiffRecord = TriggerDiffUpdateRecord | TriggerDiffInsertRecord | TriggerDiffDeleteRecord; + +/** + * @experimental + * Querying the DIFF table directly with {@link TriggerDiffHandlerContext#withExtractedDiff} will return records + * with the tracked columns extracted from the JSON value. + * This type represents the structure of such records. + * @example + * ```typescript + * const diffs = await context.withExtractedDiff>('SELECT * FROM DIFF'); + * diff.forEach(diff => console.log(diff.__operation, diff.__timestamp, diff.columnName)) + * ``` + */ +export type ExtractedTriggerDiffRecord = T & { + [K in keyof Omit as `__${string & K}`]: TriggerDiffRecord[K]; +} & { + __previous_value?: string; +}; + +/** + * @experimental + * Hooks used in the creation of a table diff trigger. + */ +export interface TriggerCreationHooks { + /** + * Executed inside a write lock before the trigger is created. + */ + beforeCreate?: (context: LockContext) => Promise; +} + +/** + * Common interface for options used in creating a diff trigger. + */ + +interface BaseCreateDiffTriggerOptions { + /** + * PowerSync source table/view to trigger and track changes from. + * This should be present in the PowerSync database's schema. + */ + source: string; + + /** + * Columns to track and report changes for. + * Defaults to all columns in the source table. + * Use an empty array to track only the ID and operation. + */ + columns?: string[]; + + /** + * Condition to filter when the triggers should fire. + * This corresponds to a SQLite [WHEN](https://sqlite.org/lang_createtrigger.html) clause in the trigger body. + * This is useful for only triggering on specific conditions. + * For example, you can use it to only trigger on certain values in the NEW row. + * Note that for PowerSync the row data is stored in a JSON column named `data`. + * The row id is available in the `id` column. + * + * NB! The WHEN clauses here are added directly to the SQLite trigger creation SQL. + * Any user input strings here should be sanitized externally. The {@link when} string template function performs + * some basic sanitization, extra external sanitization is recommended. + * + * @example + * { + * 'INSERT': sanitizeSQL`json_extract(NEW.data, '$.list_id') = ${sanitizeUUID(list.id)}`, + * 'INSERT': `TRUE`, + * 'UPDATE': sanitizeSQL`NEW.id = 'abcd' AND json_extract(NEW.data, '$.status') = 'active'`, + * 'DELETE': sanitizeSQL`json_extract(OLD.data, '$.list_id') = 'abcd'` + * } + */ + when: Partial>; + + /** + * Hooks which allow execution during the trigger creation process. + */ + hooks?: TriggerCreationHooks; +} + +/** + * @experimental + * Options for {@link TriggerManager#createDiffTrigger}. + */ +export interface CreateDiffTriggerOptions extends BaseCreateDiffTriggerOptions { + /** + * Destination table to send changes to. + * This table is created internally as a SQLite temporary table. + * This table will be dropped once the trigger is removed. + */ + destination: string; +} + +/** + * @experimental + * Callback to drop a trigger after it has been created. + */ +export type TriggerRemoveCallback = () => Promise; + +/** + * @experimental + * Context for the `onChange` handler provided to {@link TriggerManager#trackTableDiff}. + */ +export interface TriggerDiffHandlerContext extends LockContext { + /** + * The name of the temporary destination table created by the trigger. + */ + destinationTable: string; + + /** + * Allows querying the database with access to the table containing DIFF records. + * The diff table is accessible via the `DIFF` accessor. + * + * The `DIFF` table is of the form described in {@link TriggerManager#createDiffTrigger} + * ```sql + * CREATE TEMP DIFF ( + * id TEXT, + * operation TEXT, + * timestamp TEXT + * value TEXT, + * previous_value TEXT + * ); + * ``` + * + * Note that the `value` and `previous_value` columns store the row state in JSON string format. + * To access the row state in an extracted form see {@link TriggerDiffHandlerContext#withExtractedDiff}. + * + * @example + * ```sql + * --- This fetches the current state of `todo` rows which have a diff operation present. + * --- The state of the row at the time of the operation is accessible in the DIFF records. + * SELECT + * todos.* + * FROM + * DIFF + * JOIN todos ON DIFF.id = todos.id + * WHERE json_extract(DIFF.value, '$.status') = 'active' + * ``` + */ + withDiff: (query: string, params?: ReadonlyArray>) => Promise; + + /** + * Allows querying the database with access to the table containing diff records. + * The diff table is accessible via the `DIFF` accessor. + * + * This is similar to {@link withDiff} but extracts the row columns from the tracked JSON value. The diff operation + * data is aliased as `__` columns to avoid column conflicts. + * + * For {@link DiffTriggerOperation#DELETE} operations the previous_value columns are extracted for convenience. + * + * + * ```sql + * CREATE TEMP TABLE DIFF ( + * id TEXT, + * replicated_column_1 COLUMN_TYPE, + * replicated_column_2 COLUMN_TYPE, + * __operation TEXT, + * __timestamp TEXT, + * __previous_value TEXT + * ); + * ``` + * + * @example + * ```sql + * SELECT + * todos.* + * FROM + * DIFF + * JOIN todos ON DIFF.id = todos.id + * --- The todo column names are extracted from json and are available as DIFF.name + * WHERE DIFF.name = 'example' + * ``` + */ + withExtractedDiff: (query: string, params?: ReadonlyArray>) => Promise; +} + +/** + * @experimental + * Options for tracking changes to a table with {@link TriggerManager#trackTableDiff}. + */ +export interface TrackDiffOptions extends BaseCreateDiffTriggerOptions { + /** + * Handler for processing diff operations. + * Automatically invoked once diff items are present. + * Diff items are automatically cleared after the handler is invoked. + */ + onChange: (context: TriggerDiffHandlerContext) => Promise; + + /** + * The minimum interval, in milliseconds, between {@link onChange} invocations. + * @default {@link DEFAULT_WATCH_THROTTLE_MS} + */ + throttleMs?: number; +} + +/** + * @experimental + */ +export interface TriggerManager { + /** + * @experimental + * Creates a temporary trigger which tracks changes to a source table + * and writes changes to a destination table. + * The temporary destination table is created internally and will be dropped when the trigger is removed. + * The temporary destination table is created with the structure: + * + * ```sql + * CREATE TEMP TABLE ${destination} ( + * id TEXT, + * operation TEXT, + * timestamp TEXT + * value TEXT, + * previous_value TEXT + * ); + * ``` + * The `value` column contains the JSON representation of the row's value at the change. + * + * For {@link DiffTriggerOperation#UPDATE} operations the `previous_value` column contains the previous value of the changed row + * in a JSON format. + * + * NB: The triggers created by this method might be invalidated by {@link AbstractPowerSyncDatabase#updateSchema} calls. + * These triggers should manually be dropped and recreated when updating the schema. + * + * @returns A callback to remove the trigger and drop the destination table. + * + * @example + * ```javascript + * const dispose = await database.triggers.createDiffTrigger({ + * source: 'lists', + * destination: 'ps_temp_lists_diff', + * columns: ['name'], + * when: { + * [DiffTriggerOperation.INSERT]: 'TRUE', + * [DiffTriggerOperation.UPDATE]: 'TRUE', + * [DiffTriggerOperation.DELETE]: 'TRUE' + * } + * }); + * ``` + */ + createDiffTrigger(options: CreateDiffTriggerOptions): Promise; + + /** + * @experimental + * Tracks changes for a table. Triggering a provided handler on changes. + * Uses {@link createDiffTrigger} internally to create a temporary destination table. + * + * @returns A callback to cleanup the trigger and stop tracking changes. + * + * NB: The triggers created by this method might be invalidated by {@link AbstractPowerSyncDatabase#updateSchema} calls. + * These triggers should manually be dropped and recreated when updating the schema. + * + * @example + * ```javascript + * const dispose = database.triggers.trackTableDiff({ + * source: 'todos', + * columns: ['list_id'], + * when: { + * [DiffTriggerOperation.INSERT]: sanitizeSQL`json_extract(NEW.data, '$.list_id') = ${sanitizeUUID(someIdVariable)}` + * }, + * onChange: async (context) => { + * // Fetches the todo records that were inserted during this diff + * const newTodos = await context.getAll(` + * SELECT + * todos.* + * FROM + * DIFF + * JOIN todos ON DIFF.id = todos.id + * `); + * + * // Process newly created todos + * }, + * hooks: { + * beforeCreate: async (lockContext) => { + * // This hook is executed inside the write lock before the trigger is created. + * // It can be used to synchronize the current state of the table with processor logic. + * // Any changes after this callback are guaranteed to trigger the `onChange` handler. + * + * // Read the current state of the todos table + * const currentTodos = await lockContext.getAll( + * ` + * SELECT + * * + * FROM + * todos + * WHERE + * list_id = ? + * `, + * ['123'] + * ); + * + * // Process existing todos + * } + * } + * }); + * ``` + */ + trackTableDiff(options: TrackDiffOptions): Promise; +} diff --git a/packages/common/src/client/triggers/TriggerManagerImpl.ts b/packages/common/src/client/triggers/TriggerManagerImpl.ts new file mode 100644 index 000000000..dc9938ff1 --- /dev/null +++ b/packages/common/src/client/triggers/TriggerManagerImpl.ts @@ -0,0 +1,314 @@ +import { LockContext } from '../../db/DBAdapter.js'; +import { Schema } from '../../db/schema/Schema.js'; +import { type AbstractPowerSyncDatabase } from '../AbstractPowerSyncDatabase.js'; +import { DEFAULT_WATCH_THROTTLE_MS } from '../watched/WatchedQuery.js'; +import { + CreateDiffTriggerOptions, + DiffTriggerOperation, + TrackDiffOptions, + TriggerManager, + TriggerRemoveCallback +} from './TriggerManager.js'; + +export type TriggerManagerImplOptions = { + db: AbstractPowerSyncDatabase; + schema: Schema; +}; + +export class TriggerManagerImpl implements TriggerManager { + protected schema: Schema; + + constructor(protected options: TriggerManagerImplOptions) { + this.schema = options.schema; + options.db.registerListener({ + schemaChanged: (schema) => { + this.schema = schema; + } + }); + } + + protected get db() { + return this.options.db; + } + + protected async getUUID() { + const { id: uuid } = await this.db.get<{ id: string }>(/* sql */ ` + SELECT + uuid () as id + `); + + // Replace dashes with underscores for SQLite table/trigger name compatibility + return uuid.replace(/-/g, '_'); + } + + protected async removeTriggers(tx: LockContext, triggerIds: string[]) { + for (const triggerId of triggerIds) { + await tx.execute(/* sql */ `DROP TRIGGER IF EXISTS ${triggerId}; `); + } + } + + async createDiffTrigger(options: CreateDiffTriggerOptions) { + await this.db.waitForReady(); + const { source, destination, columns, when, hooks } = options; + const operations = Object.keys(when) as DiffTriggerOperation[]; + if (operations.length == 0) { + throw new Error('At least one WHEN operation must be specified for the trigger.'); + } + + const whenClauses = Object.fromEntries( + Object.entries(when).map(([operation, filter]) => [operation, `WHEN ${filter}`]) + ); + + /** + * Allow specifying the View name as the source. + * We can lookup the internal table name from the schema. + */ + const sourceDefinition = this.schema.tables.find((table) => table.viewName == source); + if (!sourceDefinition) { + throw new Error(`Source table or view "${source}" not found in the schema.`); + } + + const replicatedColumns = columns ?? sourceDefinition.columns.map((col) => col.name); + + const internalSource = sourceDefinition.internalName; + const triggerIds: string[] = []; + + const id = await this.getUUID(); + + /** + * We default to replicating all columns if no columns array is provided. + */ + const jsonFragment = (source: 'NEW' | 'OLD' = 'NEW') => { + if (columns == null) { + // Track all columns + return `${source}.data`; + } else if (columns.length == 0) { + // Don't track any columns except for the id + return `'{}'`; + } else { + // Filter the data by the replicated columns + return `json_object(${replicatedColumns.map((col) => `'${col}', json_extract(${source}.data, '$.${col}')`).join(', ')})`; + } + }; + + const disposeWarningListener = this.db.registerListener({ + schemaChanged: () => { + this.db.logger.warn( + `The PowerSync schema has changed while previously configured triggers are still operational. This might cause unexpected results.` + ); + } + }); + + /** + * Declare the cleanup function early since if any of the init steps fail, + * we need to ensure we can cleanup the created resources. + * We unfortunately cannot rely on transaction rollback. + */ + const cleanup = async () => { + disposeWarningListener(); + return this.db.writeLock(async (tx) => { + await this.removeTriggers(tx, triggerIds); + await tx.execute(/* sql */ `DROP TABLE IF EXISTS ${destination};`); + }); + }; + + const setup = async (tx: LockContext) => { + // Allow user code to execute in this lock context before the trigger is created. + await hooks?.beforeCreate?.(tx); + await tx.execute(/* sql */ ` + CREATE TEMP TABLE ${destination} ( + id TEXT, + operation TEXT, + timestamp TEXT, + value TEXT, + previous_value TEXT + ); + `); + + if (operations.includes(DiffTriggerOperation.INSERT)) { + const insertTriggerId = `ps_temp_trigger_insert_${id}`; + triggerIds.push(insertTriggerId); + + await tx.execute(/* sql */ ` + CREATE TEMP TRIGGER ${insertTriggerId} AFTER INSERT ON ${internalSource} ${whenClauses[ + DiffTriggerOperation.INSERT + ]} BEGIN + INSERT INTO + ${destination} (id, operation, timestamp, value) + VALUES + ( + NEW.id, + 'INSERT', + strftime ('%Y-%m-%dT%H:%M:%fZ', 'now'), + ${jsonFragment('NEW')} + ); + + END; + `); + } + + if (operations.includes(DiffTriggerOperation.UPDATE)) { + const updateTriggerId = `ps_temp_trigger_update_${id}`; + triggerIds.push(updateTriggerId); + + await tx.execute(/* sql */ ` + CREATE TEMP TRIGGER ${updateTriggerId} AFTER + UPDATE ON ${internalSource} ${whenClauses[DiffTriggerOperation.UPDATE]} BEGIN + INSERT INTO + ${destination} (id, operation, timestamp, value, previous_value) + VALUES + ( + NEW.id, + 'UPDATE', + strftime ('%Y-%m-%dT%H:%M:%fZ', 'now'), + ${jsonFragment('NEW')}, + ${jsonFragment('OLD')} + ); + + END; + `); + } + + if (operations.includes(DiffTriggerOperation.DELETE)) { + const deleteTriggerId = `ps_temp_trigger_delete_${id}`; + triggerIds.push(deleteTriggerId); + + // Create delete trigger for basic JSON + await tx.execute(/* sql */ ` + CREATE TEMP TRIGGER ${deleteTriggerId} AFTER DELETE ON ${internalSource} ${whenClauses[ + DiffTriggerOperation.DELETE + ]} BEGIN + INSERT INTO + ${destination} (id, operation, timestamp, value) + VALUES + ( + OLD.id, + 'DELETE', + strftime ('%Y-%m-%dT%H:%M:%fZ', 'now'), + ${jsonFragment('OLD')} + ); + + END; + `); + } + }; + + try { + await this.db.writeLock(setup); + return cleanup; + } catch (error) { + try { + await cleanup(); + } catch (cleanupError) { + throw new AggregateError([error, cleanupError], 'Error during operation and cleanup'); + } + throw error; + } + } + + async trackTableDiff(options: TrackDiffOptions): Promise { + const { source, when, columns, hooks, throttleMs = DEFAULT_WATCH_THROTTLE_MS } = options; + + await this.db.waitForReady(); + + /** + * Allow specifying the View name as the source. + * We can lookup the internal table name from the schema. + */ + const sourceDefinition = this.schema.tables.find((table) => table.viewName == source); + if (!sourceDefinition) { + throw new Error(`Source table or view "${source}" not found in the schema.`); + } + + // The columns to present in the onChange context methods. + // If no array is provided, we use all columns from the source table. + const contextColumns = columns ?? sourceDefinition.columns.map((col) => col.name); + + const id = await this.getUUID(); + const destination = `ps_temp_track_${source}_${id}`; + + // register an onChange before the trigger is created + const abortController = new AbortController(); + const abortOnChange = () => abortController.abort(); + this.db.onChange( + { + // Note that the onChange events here have their execution scheduled. + // Callbacks are throttled and are sequential. + onChange: async () => { + if (abortController.signal.aborted) return; + + // Run the handler in a write lock to keep the state of the + // destination table consistent. + await this.db.writeTransaction(async (tx) => { + const callbackResult = await options.onChange({ + ...tx, + destinationTable: destination, + withDiff: async (query, params) => { + // Wrap the query to expose the destination table + const wrappedQuery = /* sql */ ` + WITH + DIFF AS ( + SELECT + * + FROM + ${destination} + ORDER BY + timestamp ASC + ) ${query} + `; + return tx.getAll(wrappedQuery, params); + }, + withExtractedDiff: async (query, params) => { + // Wrap the query to expose the destination table + const wrappedQuery = /* sql */ ` + WITH + DIFF AS ( + SELECT + id, + ${contextColumns.length > 0 + ? `${contextColumns.map((col) => `json_extract(value, '$.${col}') as ${col}`).join(', ')},` + : ''} operation as __operation, + timestamp as __timestamp, + previous_value as __previous_value + FROM + ${destination} + ORDER BY + __timestamp ASC + ) ${query} + `; + return tx.getAll(wrappedQuery, params); + } + }); + + // Clear the destination table after processing + await tx.execute(/* sql */ `DELETE FROM ${destination};`); + return callbackResult; + }); + } + }, + { tables: [destination], signal: abortController.signal, throttleMs } + ); + + try { + const removeTrigger = await this.createDiffTrigger({ + source, + destination, + columns: contextColumns, + when, + hooks + }); + + return async () => { + abortOnChange(); + await removeTrigger(); + }; + } catch (error) { + try { + abortOnChange(); + } catch (cleanupError) { + throw new AggregateError([error, cleanupError], 'Error during operation and cleanup'); + } + throw error; + } + } +} diff --git a/packages/common/src/client/triggers/sanitizeSQL.ts b/packages/common/src/client/triggers/sanitizeSQL.ts new file mode 100644 index 000000000..9990508a3 --- /dev/null +++ b/packages/common/src/client/triggers/sanitizeSQL.ts @@ -0,0 +1,66 @@ +function sanitizeString(input: string): string { + return `'${input.replace(/'/g, "''")}'`; +} +/** + * Helper function for sanitizing UUID input strings. + * Typically used with {@link sanitizeSQL}. + */ +export function sanitizeUUID(uuid: string): string { + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + const isValid = uuidRegex.test(uuid); + if (!isValid) { + throw new Error(`${uuid} is not a valid UUID`); + } + return uuid; +} + +/** + * SQL string template function for {@link TrackDiffOptions#when} and {@link CreateDiffTriggerOptions#when}. + * + * This function performs basic string interpolation for SQLite WHEN clauses. + * + * **String placeholders:** + * - All string values passed as placeholders are automatically wrapped in single quotes (`'`). + * - Do not manually wrap placeholders in single quotes in your template string; the function will handle quoting and escaping for you. + * - Any single quotes within the string value are escaped by doubling them (`''`), as required by SQL syntax. + * + * **Other types:** + * - `null` and `undefined` are converted to SQL `NULL`. + * - Objects are stringified using `JSON.stringify()` and wrapped in single quotes, with any single quotes inside the stringified value escaped. + * - Numbers and other primitive types are inserted directly. + * + * **Usage example:** + * ```typescript + * const myID = "O'Reilly"; + * const clause = sanitizeSQL`New.id = ${myID}`; + * // Result: "New.id = 'O''Reilly'" + * ``` + * + * Avoid manually quoting placeholders: + * ```typescript + * // Incorrect: + * sanitizeSQL`New.id = '${myID}'` // Produces double quotes: New.id = ''O''Reilly'' + * ``` + */ +export function sanitizeSQL(strings: TemplateStringsArray, ...values: any[]): string { + let result = ''; + strings.forEach((str, i) => { + result += str; + if (i < values.length) { + // For SQL, escape single quotes in string values + const value = values[i]; + if (typeof value == 'string') { + result += sanitizeString(value); + } else if (value == null) { + result += 'NULL'; + } else if (typeof value == 'object') { + // Stringify the object and escape single quotes in the result + const stringified = JSON.stringify(value); + result += sanitizeString(stringified); + } else { + result += value; + } + } + }); + return result; +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 1e839d69f..2e886155d 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -32,6 +32,8 @@ export * from './db/schema/Table.js'; export * from './db/schema/TableV2.js'; export * from './client/Query.js'; +export * from './client/triggers/sanitizeSQL.js'; +export * from './client/triggers/TriggerManager.js'; export * from './client/watched/GetAllQuery.js'; export * from './client/watched/processors/AbstractQueryProcessor.js'; export * from './client/watched/processors/comparators.js'; diff --git a/packages/common/tests/sql.test.ts b/packages/common/tests/sql.test.ts new file mode 100644 index 000000000..5c8e4ad02 --- /dev/null +++ b/packages/common/tests/sql.test.ts @@ -0,0 +1,41 @@ +import { describe, expect, it } from 'vitest'; +import { sanitizeSQL } from '../src/client/triggers/sanitizeSQL.js'; +describe('SQL', () => { + describe('sanitization', () => { + it('should sanitize quoted strings', () => { + expect(sanitizeSQL`New.id = ${"O'Reilly"}`).toBe("New.id = 'O''Reilly'"); + }); + + it('should handle null and undefined', () => { + expect(sanitizeSQL`val = ${null}`).toBe('val = NULL'); + expect(sanitizeSQL`val = ${undefined}`).toBe('val = NULL'); + }); + + it('should handle numbers', () => { + expect(sanitizeSQL`age = ${42}`).toBe('age = 42'); + expect(sanitizeSQL`price = ${3.14}`).toBe('price = 3.14'); + }); + + it('should handle objects', () => { + expect(sanitizeSQL`data = ${{ foo: 'bar' }}`).toBe(`data = '{"foo":"bar"}'`); + }); + + it('should escape single quotes in stringified objects', () => { + const obj = { foo: "O'Reilly" }; + const clause = sanitizeSQL`data = ${obj}`; + expect(clause).toBe(`data = '{"foo":"O''Reilly"}'`); + }); + + it('should interpolate multiple values', () => { + const name = 'Alice'; + const age = 30; + const clause = sanitizeSQL`name = ${name} AND age = ${age}`; + expect(clause).toBe("name = 'Alice' AND age = 30"); + }); + + it('should stringify arrays', () => { + expect(sanitizeSQL`arr = ${[1, 2, 3]}`).toBe(`arr = '[1,2,3]'`); + expect(sanitizeSQL`arr = ${['a', "O'Reilly", null]}`).toBe(`arr = '["a","O''Reilly",null]'`); + }); + }); +}); diff --git a/packages/node/tests/trigger.test.ts b/packages/node/tests/trigger.test.ts new file mode 100644 index 000000000..8f6b46876 --- /dev/null +++ b/packages/node/tests/trigger.test.ts @@ -0,0 +1,608 @@ +import { + column, + DiffTriggerOperation, + ExtractedTriggerDiffRecord, + sanitizeSQL, + sanitizeUUID, + Schema, + Table, + TriggerDiffRecord +} from '@powersync/common'; +import { describe, expect, vi } from 'vitest'; +import { Database, databaseTest } from './utils'; + +describe('Triggers', () => { + /** + * Tests a diff trigger for a table. + * The triggered results are watched manually. + */ + databaseTest('Diff triggers should track table changes', async ({ database }) => { + const tempTable = 'temp_remote_lists'; + + const filteredColumns: Array = ['content']; + await database.triggers.createDiffTrigger({ + source: 'todos', + destination: tempTable, + columns: filteredColumns, + when: { + [DiffTriggerOperation.INSERT]: 'TRUE', + [DiffTriggerOperation.UPDATE]: 'TRUE', + [DiffTriggerOperation.DELETE]: 'TRUE' + } + }); + + const results = [] as TriggerDiffRecord[]; + + database.onChange( + { + // This callback async processed. Invocations are sequential. + onChange: async () => { + await database.writeLock(async (tx) => { + const changes = await tx.getAll(/* sql */ ` + SELECT + * + FROM + ${tempTable} + `); + results.push(...changes); + // Clear the temp table after processing + await tx.execute(/* sql */ ` DELETE FROM ${tempTable}; `); + }); + } + }, + { tables: [tempTable] } + ); + + // Do some changes to the source table + const initialContent = 'test todo'; + await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?);', [initialContent]); + await database.execute(`UPDATE todos SET content = 'wooo'`); + const updatedContent = 'wooo'; + await database.execute('DELETE FROM todos WHERE content = ?', [updatedContent]); + + // Wait for the changes to be processed and results to be collected + await vi.waitFor( + () => { + expect(results.length).toEqual(3); + + expect(results[0].operation).toEqual(DiffTriggerOperation.INSERT); + const parsedInsert = JSON.parse(results[0].value); + // only the filtered columns should be tracked + expect(Object.keys(parsedInsert)).deep.eq(filteredColumns); + expect(parsedInsert.content).eq(initialContent); + + const updateRaw = results[1]; + expect(updateRaw).toBeDefined(); + expect(updateRaw.operation).toEqual(DiffTriggerOperation.UPDATE); + if (updateRaw.operation == DiffTriggerOperation.UPDATE) { + // The `if` just exposes the type correctly + expect(JSON.parse(updateRaw.value).content).eq(updatedContent); + expect(JSON.parse(updateRaw.previous_value).content).eq(initialContent); + } + + expect(results[2].operation).toEqual(DiffTriggerOperation.DELETE); + expect(JSON.parse(results[2].value).content).eq(updatedContent); + }, + { timeout: 1000 } + ); + }); + + /** + * Uses the automatic handlers for triggers to track changes. + */ + databaseTest('Should be able to track table inserts', async ({ database }) => { + await database.execute( + /* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), ?), + (uuid (), ?) RETURNING * + `, + ['test list 1', 'test list 2'] + ); + + const [firstList, secondList] = await database.getAll(/* sql */ ` + SELECT + * + FROM + lists + `); + + const results: Database['todos'][] = []; + + /** + * Watch the todos table for changes. Only track the diff for rows belonging to the first list. + */ + await database.triggers.trackTableDiff({ + source: 'todos', + columns: ['list_id'], + when: { + [DiffTriggerOperation.INSERT]: sanitizeSQL`json_extract(NEW.data, '$.list_id') = ${sanitizeUUID(firstList.id)}` + }, + onChange: async (context) => { + // Fetches the current state of todo records that were inserted during this diff window. + const newTodos = await context.withDiff(/* sql */ ` + SELECT + todos.* + FROM + DIFF + JOIN todos ON DIFF.id = todos.id + `); + + results.push(...newTodos); + } + }); + + // Create todos for both lists + await database.execute( + /* sql */ ` + INSERT INTO + todos (id, content, list_id) + VALUES + (uuid (), 'todo 1', ?), + (uuid (), 'todo 2', ?); + `, + [firstList.id, secondList.id] + ); + + // Wait for the changes to be processed and results to be collected + // We should only get a result for the first list. + await vi.waitFor( + () => { + expect(results.length).toEqual(1); + }, + { timeout: 1000 } + ); + + // Do further inserts + // Create todos for both lists + await database.execute( + /* sql */ ` + INSERT INTO + todos (id, content, list_id) + VALUES + (uuid (), 'todo 1', ?), + (uuid (), 'todo 2', ?); + `, + [firstList.id, secondList.id] + ); + + await vi.waitFor( + () => { + expect(results.length).toEqual(2); + }, + { timeout: 1000 } + ); + }); + + databaseTest('Should be able to track table updates', async ({ database }) => { + const { rows } = await database.execute( + /* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), ?) RETURNING * + `, + ['test list 1'] + ); + + const list = rows!.item(0) as Database['lists']; + + const changes: ExtractedTriggerDiffRecord[] = []; + + /** + * Watch the todos table for changes. Only track the diff for rows belonging to the first list. + */ + await database.triggers.trackTableDiff({ + source: 'lists', + when: { + [DiffTriggerOperation.UPDATE]: sanitizeSQL`NEW.id = ${sanitizeUUID(list.id)}`, + [DiffTriggerOperation.DELETE]: 'TRUE' + }, + onChange: async (context) => { + // Fetches the todo records that were inserted during this diff + const diffs = await context.withExtractedDiff>(/* sql */ ` + SELECT + * + FROM + DIFF + `); + + changes.push(...diffs); + } + }); + + const updateCount = 10; + for (let i = 0; i < updateCount; i++) { + // Create todos for both lists + await database.execute( + /* sql */ ` + UPDATE lists + set + name = 'updated ${i}' + WHERE + id = ?; + `, + [list.id] + ); + } + + await vi.waitFor( + () => { + expect(changes.length).toEqual(updateCount); + expect(changes.map((c) => c.name)).toEqual(Array.from({ length: updateCount }, (_, i) => `updated ${i}`)); + }, + { timeout: 1000 } + ); + + // clear the items + await database.execute( + /* sql */ ` + DELETE FROM lists + WHERE + id = ? + `, + [list.id] + ); + + await vi.waitFor( + () => { + expect(changes.length).toEqual(updateCount + 1); + expect(changes[changes.length - 1].__operation).eq(DiffTriggerOperation.DELETE); + // The delete diff should contain the previous value + expect(changes[changes.length - 1].name).eq(`updated ${updateCount - 1}`); + }, + { timeout: 1000 } + ); + }); + + /** + * Allows syncing the current state of the database with a lock context. + */ + databaseTest('Should accept hooks', async ({ database }) => { + await database.execute( + /* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), ?), + (uuid (), ?) + `, + ['test list 1', 'test list 2'] + ); + + const [firstList] = await database.getAll(/* sql */ ` + SELECT + * + FROM + lists + `); + + const todos: Database['todos'][] = []; + + const createTodo = async () => { + // Create todos for both lists + await database.writeLock(async (tx) => { + await tx.execute( + /* sql */ ` + INSERT INTO + todos (id, content, list_id) + VALUES + (uuid (), 'todo', ?) + `, + [firstList.id] + ); + }); + }; + + // Trigger the operations in a random order; + const todoCreationCount = 100; + const initialTodoCreationCount = 10; + + await Promise.all(Array.from({ length: initialTodoCreationCount }).map(createTodo)); + + // Configure the trigger to watch for changes. + // The onChange handler is guaranteed to see any change after the state above. + await database.triggers.trackTableDiff({ + source: 'todos', + columns: ['list_id'], + when: { + [DiffTriggerOperation.INSERT]: sanitizeSQL`json_extract(NEW.data, '$.list_id') = ${sanitizeUUID(firstList.id)}` + }, + onChange: async (context) => { + // Fetches the todo records that were inserted during this diff + const newTodos = await context.withDiff(/* sql */ ` + SELECT + todos.* + FROM + DIFF + JOIN todos ON DIFF.id = todos.id + `); + todos.push(...newTodos); + }, + hooks: { + beforeCreate: async (lockContext) => { + // This hook is executed inside the write lock before the trigger is created. + // It can be used to synchronize the current state and fetch all changes after the current state. + // Read the current state of the todos table + const currentTodos = await lockContext.getAll( + /* sql */ ` + SELECT + * + FROM + todos + WHERE + list_id = ? + `, + [firstList.id] + ); + + // Example code could process the current todos if necessary + todos.push(...currentTodos); + } + } + }); + + await Promise.all(Array.from({ length: todoCreationCount - initialTodoCreationCount }).map(createTodo)); + + // Wait for the changes to be processed and results to be collected + // We should have recorded all the todos which are present + await vi.waitFor( + async () => { + expect(todos.length).toEqual(todoCreationCount); + }, + { timeout: 1000, interval: 100 } + ); + }); + + databaseTest('Should extract diff values', async ({ database }) => { + await database.execute( + /* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), ?), + (uuid (), ?) + `, + ['test list 1', 'test list 2'] + ); + + const [firstList] = await database.getAll(/* sql */ ` + SELECT + * + FROM + lists + `); + + const changes: Array<{ content: string; operation: DiffTriggerOperation }> = []; + + const createTodo = async (content: string) => { + // Create todos for both lists + await database.writeLock(async (tx) => { + await tx.execute( + /* sql */ ` + INSERT INTO + todos (id, content, list_id) + VALUES + (uuid (), ?, ?) + `, + [content, firstList.id] + ); + }); + }; + + // Configure the trigger to watch for changes. + // The onChange handler is guaranteed to see any change after the state above. + await database.triggers.trackTableDiff({ + source: 'todos', + when: { + [DiffTriggerOperation.INSERT]: sanitizeSQL`json_extract(NEW.data, '$.list_id') = ${sanitizeUUID(firstList.id)}` + }, + onChange: async (context) => { + // Fetches the content of the records at the time of the operation + const extractedDiff = await context.withExtractedDiff<{ content: string; operation: DiffTriggerOperation }>( + /* sql */ ` + SELECT + -- Get the values at the time of the operation + content, + __operation as operation + FROM + DIFF + ` + ); + changes.push(...extractedDiff); + } + }); + + await createTodo('todo 1'); + await createTodo('todo 2'); + await createTodo('todo 3'); + + // Wait for the changes to be processed and results to be collected + // We should have recorded all the todos which are present + await vi.waitFor( + async () => { + expect(changes.length).toEqual(3); + expect(changes.map((c) => c.content)).toEqual(['todo 1', 'todo 2', 'todo 3']); + expect(changes.every((c) => c.operation === DiffTriggerOperation.INSERT)).toBeTruthy(); + }, + { timeout: 1000, interval: 100 } + ); + }); + + databaseTest('Should allow tracking 0 columns', async ({ database }) => { + /** + * Tracks the ids of todos reported via the trigger + */ + const changes: string[] = []; + + /** + * Tracks the ids of todos created + */ + const ids: string[] = []; + const createTodo = async (content: string) => { + // Create todos for both lists + return database.writeLock(async (tx) => { + const result = await tx.execute( + /* sql */ ` + INSERT INTO + todos (id, content) + VALUES + (uuid (), ?) RETURNING id + `, + [content] + ); + return result.rows?._array?.[0].id; + }); + }; + + await database.triggers.trackTableDiff({ + source: 'todos', + when: { + [DiffTriggerOperation.INSERT]: 'TRUE', + [DiffTriggerOperation.UPDATE]: 'TRUE', + [DiffTriggerOperation.DELETE]: 'TRUE' + }, + // Only track the row ids + columns: [], + onChange: async (context) => { + // Fetches the content of the records at the time of the operation + const extractedDiff = await context.withExtractedDiff<{ id: string }>(/* sql */ ` + SELECT + * + FROM + DIFF + `); + changes.push(...extractedDiff.map((d) => d.id)); + } + }); + + ids.push(await createTodo('todo 1')); + ids.push(await createTodo('todo 2')); + const updatedId = await createTodo('todo 3'); + ids.push(updatedId); + + await database.execute(/* sql */ ` + UPDATE todos + SET + content = 'todo 4' + WHERE + content = 'todo 3' + `); + // keep track of updates for comparison + ids.push(updatedId); + + await database.execute(/* sql */ ` + DELETE FROM todos + WHERE + content = 'todo 4' + `); + ids.push(updatedId); + + // Wait for the changes to be processed and results to be collected + // We should have recorded all the todos which are present + await vi.waitFor( + async () => { + expect(changes).toEqual(ids); + }, + { timeout: 1000, interval: 100 } + ); + }); + + databaseTest('Should only track listed columns', async ({ database }) => { + const newSchema = new Schema({ + todos: new Table({ + content: column.text, + columnA: column.text, + columnB: column.text + }) + }); + await database.updateSchema(newSchema); + + type NewTodoRecord = (typeof newSchema)['types']['todos']; + + const changes: ExtractedTriggerDiffRecord[] = []; + + const createTodo = async (content: string, columnA = 'A', columnB = 'B'): Promise => { + // Create todos for both lists + return database.writeLock(async (tx) => { + const result = await tx.execute( + /* sql */ ` + INSERT INTO + todos (id, content, columnA, columnB) + VALUES + (uuid (), ?, ?, ?) RETURNING id + `, + [content, columnA, columnB] + ); + return result.rows?._array?.[0]; + }); + }; + + await database.triggers.trackTableDiff({ + source: 'todos', + when: { + [DiffTriggerOperation.INSERT]: 'TRUE', + [DiffTriggerOperation.UPDATE]: 'TRUE', + [DiffTriggerOperation.DELETE]: 'TRUE' + }, + columns: ['columnA'], + onChange: async (context) => { + // Fetches the content of the records at the time of the operation + const extractedDiff = await context.withExtractedDiff>(/* sql */ ` + SELECT + * + FROM + DIFF + `); + changes.push(...extractedDiff); + } + }); + + await createTodo('todo 1'); + await createTodo('todo 2'); + await createTodo('todo 3'); + + // Do an update operation to ensure only the tracked columns of updated values are stored + await database.execute(/* sql */ ` + UPDATE todos + SET + content = 'todo 4' + WHERE + content = 'todo 3' + `); + + // Do a delete operation to ensure only the tracked columns of updated values are stored + await database.execute(/* sql */ ` + DELETE FROM todos + WHERE + content = 'todo 4' + `); + + // Wait for all the changes to be recorded + await vi.waitFor( + async () => { + expect(changes.length).toEqual(5); + }, + { timeout: 1000, interval: 100 } + ); + + // Inserts should only have the tracked columns + expect(changes[0].__operation).eq(DiffTriggerOperation.INSERT); + expect(changes[1].__operation).eq(DiffTriggerOperation.INSERT); + expect(changes[2].__operation).eq(DiffTriggerOperation.INSERT); + // Should not track this column + expect(changes[0].columnB).toBeUndefined(); + + expect(changes[3].__operation).eq(DiffTriggerOperation.UPDATE); + expect(changes[3].columnB).toBeUndefined(); + expect(changes[3].__previous_value).toBeDefined(); + expect(Object.keys(JSON.parse(changes[3].__previous_value))).to.deep.equal(['columnA']); + + // For deletes we extract the old value for convenience (there is no new value) + expect(changes[4].__operation).eq(DiffTriggerOperation.DELETE); + expect(changes[4].columnB).toBeUndefined(); + expect(changes[4].__previous_value).toBeNull(); + }); +}); diff --git a/packages/node/tests/tsconfig.json b/packages/node/tests/tsconfig.json new file mode 100644 index 000000000..72cf3b6bf --- /dev/null +++ b/packages/node/tests/tsconfig.json @@ -0,0 +1,10 @@ +{ + "references": [ + { + "path": "../../common" + }, + { + "path": "../" + } + ] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5ecdfdcda..d059c9e8f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18769,8 +18769,8 @@ packages: sprintf-js@1.1.3: resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==} - sql-formatter@15.6.2: - resolution: {integrity: sha512-ZjqOfJGuB97UeHzTJoTbadlM0h9ynehtSTHNUbGfXR4HZ4rCIoD2oIW91W+A5oE76k8hl0Uz5GD8Sx3Pt9Xa3w==} + sql-formatter@15.6.6: + resolution: {integrity: sha512-bZydXEXhaNDQBr8xYHC3a8thwcaMuTBp0CkKGjwGYDsIB26tnlWeWPwJtSQ0TEwiJcz9iJJON5mFPkx7XroHcg==} hasBin: true sql.js@1.13.0: @@ -42009,7 +42009,7 @@ snapshots: jsox: 1.2.123 node-sql-parser: 4.18.0 prettier: 3.5.3 - sql-formatter: 15.6.2 + sql-formatter: 15.6.6 tslib: 2.8.1 prettier@2.8.8: {} @@ -44411,7 +44411,7 @@ snapshots: sprintf-js@1.1.3: {} - sql-formatter@15.6.2: + sql-formatter@15.6.6: dependencies: argparse: 2.0.1 nearley: 2.20.1