Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"devDependencies": {
"@powersync/drizzle-driver": "workspace:*",
"@types/async-lock": "^1.4.0",
"@types/node": "^24.2.0",
"drizzle-orm": "^0.35.2",
"rollup": "4.14.3",
"typescript": "^5.5.3",
Expand Down
9 changes: 8 additions & 1 deletion packages/node/src/db/AsyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { QueryResult } from '@powersync/common';
import { NodeDatabaseImplementation } from './options.js';

export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
rows?: {
Expand All @@ -7,8 +8,14 @@ export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
};
};

export interface AsyncDatabaseOpenOptions {
path: string;
isWriter: boolean;
implementation: NodeDatabaseImplementation;
}

export interface AsyncDatabaseOpener {
open(path: string, isWriter: boolean): Promise<AsyncDatabase>;
open(options: AsyncDatabaseOpenOptions): Promise<AsyncDatabase>;
}

export interface AsyncDatabase {
Expand Down
6 changes: 5 additions & 1 deletion packages/node/src/db/BetterSQLite3DBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ export class BetterSQLite3DBAdapter extends BaseObserver<DBAdapterListener> impl
console.error('Unexpected PowerSync database worker error', e);
});

const database = (await comlink.open(dbFilePath, isWriter)) as Remote<AsyncDatabase>;
const database = (await comlink.open({
path: dbFilePath,
isWriter,
implementation: this.options.implementation ?? 'better-sqlite3'
})) as Remote<AsyncDatabase>;
return new RemoteConnection(worker, comlink, database);
};

Expand Down
104 changes: 104 additions & 0 deletions packages/node/src/db/BetterSqliteWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import * as Comlink from 'comlink';
import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3';
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';
import { PowerSyncWorkerOptions } from './SqliteWorker.js';
import { threadId } from 'node:worker_threads';

class BlockingAsyncDatabase implements AsyncDatabase {
private readonly db: Database;

private readonly uncommittedUpdatedTables = new Set<string>();
private readonly committedUpdatedTables = new Set<string>();

constructor(db: Database) {
this.db = db;

db.function('node_thread_id', () => threadId);
}

collectCommittedUpdates() {
const resolved = Promise.resolve([...this.committedUpdatedTables]);
this.committedUpdatedTables.clear();
return resolved;
}

installUpdateHooks() {
this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => {
this.uncommittedUpdatedTables.add(tableName);
});

this.db.commitHook(() => {
for (const tableName of this.uncommittedUpdatedTables) {
this.committedUpdatedTables.add(tableName);
}
this.uncommittedUpdatedTables.clear();
return true;
});

this.db.rollbackHook(() => {
this.uncommittedUpdatedTables.clear();
});
}

async close() {
this.db.close();
}

async execute(query: string, params: any[]) {
const stmt = this.db.prepare(query);
if (stmt.reader) {
const rows = stmt.all(params);
return {
rowsAffected: 0,
rows: {
_array: rows,
length: rows.length
}
};
} else {
const info = stmt.run(params);
return {
rowsAffected: info.changes,
insertId: Number(info.lastInsertRowid)
};
}
}

async executeRaw(query: string, params: any[]) {
const stmt = this.db.prepare(query);

if (stmt.reader) {
return stmt.raw().all(params);
} else {
stmt.raw().run(params);
return [];
}
}

async executeBatch(query: string, params: any[][]) {
params = params ?? [];

let rowsAffected = 0;

const stmt = this.db.prepare(query);
for (const paramSet of params) {
const info = stmt.run(paramSet);
rowsAffected += info.changes;
}

return { rowsAffected };
}
}

export async function openDatabase(worker: PowerSyncWorkerOptions, options: AsyncDatabaseOpenOptions) {
const baseDB = new BetterSQLite3Database(options.path);
baseDB.pragma('journal_mode = WAL');
baseDB.loadExtension(worker.extensionPath(), 'sqlite3_powersync_init');
if (!options.isWriter) {
baseDB.pragma('query_only = true');
}

const asyncDb = new BlockingAsyncDatabase(baseDB);
asyncDb.installUpdateHooks();
return asyncDb;
}
77 changes: 77 additions & 0 deletions packages/node/src/db/NodeSqliteWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { threadId } from 'node:worker_threads';
import type { DatabaseSync } from 'node:sqlite';

import * as Comlink from 'comlink';
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';
import { PowerSyncWorkerOptions } from './SqliteWorker.js';

class BlockingNodeDatabase implements AsyncDatabase {
private readonly db: DatabaseSync;

constructor(db: DatabaseSync, write: boolean) {
this.db = db;

db.function('node_thread_id', () => threadId);
if (write) {
db.exec("SELECT powersync_update_hooks('install');");
}
}

async collectCommittedUpdates() {
const stmt = this.db.prepare("SELECT powersync_update_hooks('get') AS r;");
const row = stmt.get()!;

return JSON.parse(row['r'] as string) as string[];
}

async close() {
this.db.close();
}

async execute(query: string, params: any[]) {
const stmt = this.db.prepare(query);
const rows = stmt.all(...params);
return {
rowsAffected: 0,
rows: {
_array: rows,
length: rows.length
}
};
}

async executeRaw(query: string, params: any[]) {
const stmt = this.db.prepare(query);
(stmt as any).setReturnArrays(true); // Missing in @types/node, https://nodejs.org/api/sqlite.html#statementsetreturnarraysenabled
return stmt.all(...params) as any as any[][];
}

async executeBatch(query: string, params: any[][]) {
params = params ?? [];

let rowsAffected = 0;

const stmt = this.db.prepare(query);
for (const paramSet of params) {
const info = stmt.run(...paramSet);
rowsAffected += info.changes as number;
}

return { rowsAffected };
}
}

export async function openDatabase(worker: PowerSyncWorkerOptions, options: AsyncDatabaseOpenOptions) {
// NOTE: We want to import node:sqlite dynamically, to avoid bundlers unconditionally requiring node:sqlite in the
// end, since that would make us incompatible with older Node.JS versions.
const { DatabaseSync } = await import('node:sqlite');

const baseDB = new DatabaseSync(options.path, { allowExtension: true });
baseDB.exec('pragma journal_mode = WAL');
baseDB.loadExtension(worker.extensionPath());
if (!options.isWriter) {
baseDB.exec('pragma query_only = true');
}

return new BlockingNodeDatabase(baseDB, options.isWriter);
}
142 changes: 30 additions & 112 deletions packages/node/src/db/SqliteWorker.ts
Original file line number Diff line number Diff line change
@@ -1,118 +1,11 @@
import * as path from 'node:path';
import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3';
import * as Comlink from 'comlink';
import { parentPort, threadId } from 'node:worker_threads';
import { parentPort } from 'node:worker_threads';
import OS from 'node:os';
import url from 'node:url';
import { AsyncDatabase, AsyncDatabaseOpener } from './AsyncDatabase.js';

class BlockingAsyncDatabase implements AsyncDatabase {
private readonly db: Database;

private readonly uncommittedUpdatedTables = new Set<string>();
private readonly committedUpdatedTables = new Set<string>();

constructor(db: Database) {
this.db = db;

db.function('node_thread_id', () => threadId);
}

collectCommittedUpdates() {
const resolved = Promise.resolve([...this.committedUpdatedTables]);
this.committedUpdatedTables.clear();
return resolved;
}

installUpdateHooks() {
this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => {
this.uncommittedUpdatedTables.add(tableName);
});

this.db.commitHook(() => {
for (const tableName of this.uncommittedUpdatedTables) {
this.committedUpdatedTables.add(tableName);
}
this.uncommittedUpdatedTables.clear();
return true;
});

this.db.rollbackHook(() => {
this.uncommittedUpdatedTables.clear();
});
}

async close() {
this.db.close();
}

async execute(query: string, params: any[]) {
const stmt = this.db.prepare(query);
if (stmt.reader) {
const rows = stmt.all(params);
return {
rowsAffected: 0,
rows: {
_array: rows,
length: rows.length
}
};
} else {
const info = stmt.run(params);
return {
rowsAffected: info.changes,
insertId: Number(info.lastInsertRowid)
};
}
}

async executeRaw(query: string, params: any[]) {
const stmt = this.db.prepare(query);

if (stmt.reader) {
return stmt.raw().all(params);
} else {
stmt.raw().run(params);
return [];
}
}

async executeBatch(query: string, params: any[][]) {
params = params ?? [];

let rowsAffected = 0;

const stmt = this.db.prepare(query);
for (const paramSet of params) {
const info = stmt.run(paramSet);
rowsAffected += info.changes;
}

return { rowsAffected };
}
}

class BetterSqliteWorker implements AsyncDatabaseOpener {
options: PowerSyncWorkerOptions;

constructor(options: PowerSyncWorkerOptions) {
this.options = options;
}

async open(path: string, isWriter: boolean): Promise<AsyncDatabase> {
const baseDB = new BetterSQLite3Database(path);
baseDB.pragma('journal_mode = WAL');
baseDB.loadExtension(this.options.extensionPath(), 'sqlite3_powersync_init');
if (!isWriter) {
baseDB.pragma('query_only = true');
}

const asyncDb = new BlockingAsyncDatabase(baseDB);
asyncDb.installUpdateHooks();

return Comlink.proxy(asyncDb);
}
}
import { openDatabase as openBetterSqliteDatabase } from './BetterSqliteWorker.js';
import { openDatabase as openNodeDatabase } from './NodeSqliteWorker.js';
import { AsyncDatabase, AsyncDatabaseOpener, AsyncDatabaseOpenOptions } from './AsyncDatabase.js';

export interface PowerSyncWorkerOptions {
/**
Expand Down Expand Up @@ -152,5 +45,30 @@ export function startPowerSyncWorker(options?: Partial<PowerSyncWorkerOptions>)
...options
};

Comlink.expose(new BetterSqliteWorker(resolvedOptions), parentPort! as Comlink.Endpoint);
Comlink.expose(new DatabaseOpenHelper(resolvedOptions), parentPort! as Comlink.Endpoint);
}

class DatabaseOpenHelper implements AsyncDatabaseOpener {
private options: PowerSyncWorkerOptions;

constructor(options: PowerSyncWorkerOptions) {
this.options = options;
}

async open(options: AsyncDatabaseOpenOptions): Promise<AsyncDatabase> {
let database: AsyncDatabase;

switch (options.implementation) {
case 'better-sqlite3':
database = await openBetterSqliteDatabase(this.options, options);
break;
case 'node':
database = await openNodeDatabase(this.options, options);
break;
default:
throw new Error(`Unknown database implementation: ${options.implementation}.`);
}

return Comlink.proxy(database);
}
}
4 changes: 4 additions & 0 deletions packages/node/src/db/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import { SQLOpenOptions } from '@powersync/common';

export type WorkerOpener = (...args: ConstructorParameters<typeof Worker>) => InstanceType<typeof Worker>;

export type NodeDatabaseImplementation = 'better-sqlite3' | 'node';

/**
* The {@link SQLOpenOptions} available across all PowerSync SDKs for JavaScript extended with
* Node.JS-specific options.
*/
export interface NodeSQLOpenOptions extends SQLOpenOptions {
implementation?: NodeDatabaseImplementation;

/**
* The Node.JS SDK will use one worker to run writing queries and additional workers to run reads.
* This option controls how many workers to use for reads.
Expand Down
Loading