Skip to content

Commit d8f305f

Browse files
committed
Add getCrudTransactions()
1 parent 03831dc commit d8f305f

File tree

3 files changed

+103
-22
lines changed

3 files changed

+103
-22
lines changed

.changeset/nice-dragons-smile.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/node': minor
4+
'@powersync/react-native': minor
5+
'@powersync/web': minor
6+
---
7+
8+
Add `getCrudTransactions()`, returning an async iterator of transactions. This can be used to batch transactions when uploading CRUD data.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -632,35 +632,72 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
632632
* @returns A transaction of CRUD operations to upload, or null if there are none
633633
*/
634634
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
635-
return await this.readTransaction(async (tx) => {
636-
const first = await tx.getOptional<CrudEntryJSON>(
637-
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`
638-
);
635+
for await (const transaction of this.getCrudTransactions()) {
636+
return transaction;
637+
}
639638

640-
if (!first) {
641-
return null;
642-
}
643-
const txId = first.tx_id;
639+
return null;
640+
}
644641

645-
let all: CrudEntry[];
646-
if (!txId) {
647-
all = [CrudEntry.fromRow(first)];
648-
} else {
649-
const result = await tx.getAll<CrudEntryJSON>(
650-
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
651-
[txId]
652-
);
653-
all = result.map((row) => CrudEntry.fromRow(row));
642+
/**
643+
* Returns an async iterator of completed transactions with local writes against the database.
644+
*
645+
* This is typically used from the {@link PowerSyncBackendConnector.uploadData} callback. Each entry emitted by the
646+
* returned flow is a full transaction containing all local writes made while that transaction was active.
647+
*
648+
* Unlike {@link getNextCrudTransaction}, which always returns the oldest transaction that hasn't been
649+
* {@link CrudTransaction.complete}d yet, this flow can be used to collect multiple transactions. Calling
650+
* {@link CrudTransaction.complete} will mark _all_ transactions emitted by the flow until that point as completed.
651+
*
652+
* This can be used to upload multiple transactions in a single batch, e.g with:
653+
*
654+
* ```TypeScript
655+
* let lastTransaction: CrudTransaction | null = null;
656+
* let batch: CrudEntry[] = [];
657+
*
658+
* for await (const transaction of database.getCrudTransactions()) {
659+
* batch.push(...transaction.crud);
660+
* lastTransaction = transaction;
661+
*
662+
* if (batch.length > 10) {
663+
* break;
664+
* }
665+
* }
666+
* ```
667+
*
668+
* If there is no local data to upload, the async iterator complete without emitting any items.
669+
*/
670+
async *getCrudTransactions(): AsyncIterable<CrudTransaction> {
671+
let lastCrudItemId = -1;
672+
const sql = `
673+
WITH RECURSIVE crud_entries AS (
674+
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
675+
UNION ALL
676+
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
677+
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
678+
WHERE crud_entries.tx_id = ps_crud.tx_id
679+
)
680+
SELECT * FROM crud_entries;
681+
`;
682+
683+
while (true) {
684+
const nextTransaction = await this.database.getAll<CrudEntryJSON>(sql, [lastCrudItemId]);
685+
if (nextTransaction.length == 0) {
686+
break;
654687
}
655688

656-
const last = all[all.length - 1];
689+
const items = nextTransaction.map((row) => CrudEntry.fromRow(row));
690+
const last = items[items.length - 1];
691+
const txId = last.transactionId;
657692

658-
return new CrudTransaction(
659-
all,
693+
yield new CrudTransaction(
694+
items,
660695
async (writeCheckpoint?: string) => this.handleCrudCheckpoint(last.clientId, writeCheckpoint),
661696
txId
662697
);
663-
});
698+
699+
lastCrudItemId = last.clientId;
700+
}
664701
}
665702

666703
/**

packages/node/tests/PowerSyncDatabase.test.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Worker } from 'node:worker_threads';
33

44
import { vi, expect, test } from 'vitest';
55
import { AppSchema, databaseTest, tempDirectoryTest } from './utils';
6-
import { PowerSyncDatabase } from '../lib';
6+
import { CrudEntry, CrudTransaction, PowerSyncDatabase } from '../lib';
77
import { WorkerOpener } from '../lib/db/options';
88

99
test('validates options', async () => {
@@ -131,3 +131,39 @@ databaseTest.skip('can watch queries', async ({ database }) => {
131131
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']);
132132
expect((await query.next()).value.rows).toHaveLength(4);
133133
});
134+
135+
databaseTest('getCrudTransactions', async ({ database }) => {
136+
async function createTransaction(amount: number) {
137+
await database.writeTransaction(async (tx) => {
138+
for (let i = 0; i < amount; i++) {
139+
await tx.execute('insert into todos (id) values (uuid())');
140+
}
141+
});
142+
}
143+
144+
let iterator = database.getCrudTransactions()[Symbol.asyncIterator]();
145+
expect(await iterator.next()).toMatchObject({ done: true });
146+
147+
await createTransaction(5);
148+
await createTransaction(10);
149+
await createTransaction(15);
150+
151+
let lastTransaction: CrudTransaction | null = null;
152+
let batch: CrudEntry[] = [];
153+
154+
// Take the first two transactions via the async generator.
155+
for await (const transaction of database.getCrudTransactions()) {
156+
batch.push(...transaction.crud);
157+
lastTransaction = transaction;
158+
159+
if (batch.length > 10) {
160+
break;
161+
}
162+
}
163+
164+
expect(batch).toHaveLength(15);
165+
await lastTransaction!.complete();
166+
167+
const remainingTransaction = await database.getNextCrudTransaction();
168+
expect(remainingTransaction?.crud).toHaveLength(15);
169+
});

0 commit comments

Comments
 (0)