diff --git a/packages/attachments/package.json b/packages/attachments/package.json index 9bcb3a9b5..1bd86678f 100644 --- a/packages/attachments/package.json +++ b/packages/attachments/package.json @@ -49,8 +49,10 @@ }, "devDependencies": { "@powersync/common": "workspace:*", + "@powersync/web": "workspace:*", "@types/node": "^20.17.6", "vite": "^6.1.0", - "vite-plugin-top-level-await": "^1.4.4" + "vite-plugin-top-level-await": "^1.4.4", + "vite-plugin-wasm": "^3.3.0" } } diff --git a/packages/attachments/src/AbstractAttachmentQueue.ts b/packages/attachments/src/AbstractAttachmentQueue.ts deleted file mode 100644 index de835d44e..000000000 --- a/packages/attachments/src/AbstractAttachmentQueue.ts +++ /dev/null @@ -1,539 +0,0 @@ -import { AbstractPowerSyncDatabase, Transaction } from '@powersync/common'; -import { ATTACHMENT_TABLE, AttachmentRecord, AttachmentState } from './Schema.js'; -import { EncodingType, StorageAdapter } from './StorageAdapter.js'; - -export interface AttachmentQueueOptions { - powersync: AbstractPowerSyncDatabase; - storage: StorageAdapter; - /** - * How often to check for new attachments to sync, in milliseconds. Set to 0 or undefined to disable. - */ - syncInterval?: number; - /** - * How many attachments to keep in the cache - */ - cacheLimit?: number; - /** - * The name of the directory where attachments are stored on the device, not the full path. Defaults to `attachments`. - */ - attachmentDirectoryName?: string; - - /** - * The name of the table where attachments are stored, defaults to `attachments` table. - */ - attachmentTableName?: string; - /** - * Whether to mark the initial watched attachment IDs to be synced - */ - performInitialSync?: boolean; - /** - * Should attachments be downloaded - */ - downloadAttachments?: boolean; - /** - * How to handle download errors, return { retry: false } to ignore the download - */ - onDownloadError?: (attachment: AttachmentRecord, exception: any) => Promise<{ retry?: boolean }>; - /** - * How to handle upload errors, return { retry: false } to ignore the upload - */ - onUploadError?: (attachment: AttachmentRecord, exception: any) => Promise<{ retry?: boolean }>; -} - -export const DEFAULT_ATTACHMENT_QUEUE_OPTIONS: Partial = { - attachmentDirectoryName: ATTACHMENT_TABLE, - attachmentTableName: ATTACHMENT_TABLE, - syncInterval: 30_000, - cacheLimit: 100, - performInitialSync: true, - downloadAttachments: true -}; - -export abstract class AbstractAttachmentQueue { - uploading: boolean; - downloading: boolean; - initialSync: boolean; - options: T; - downloadQueue: Set; - - constructor(options: T) { - this.options = { - ...DEFAULT_ATTACHMENT_QUEUE_OPTIONS, - ...options - }; - this.downloadQueue = new Set(); - this.uploading = false; - this.downloading = false; - this.initialSync = this.options.performInitialSync; - } - - /** - * Takes in a callback that gets invoked with attachment IDs that need to be synced. - * In most cases this will contain a watch query. - * - * @example - * ```javascript - * onAttachmentIdsChange(onUpdate) { - * this.powersync.watch('SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL', [], { - * onResult: (result) => onUpdate(result.rows?._array.map((r) => r.id) ?? []) - * }); - * } - * ``` - */ - abstract onAttachmentIdsChange(onUpdate: (ids: string[]) => void): void; - - /** - * Create a new AttachmentRecord, this gets called when the attachment id is not found in the database. - */ - abstract newAttachmentRecord(record?: Partial): Promise; - - protected get powersync() { - return this.options.powersync; - } - - get logger() { - return this.powersync.logger ?? console; - } - - protected get storage() { - return this.options.storage; - } - - get table() { - return this.options.attachmentTableName!; - } - - async init() { - // Ensure the directory where attachments are downloaded, exists - await this.storage.makeDir(this.storageDirectory); - - this.watchAttachmentIds(); - this.watchUploads(); - this.watchDownloads(); - - if (this.options.syncInterval > 0) { - // In addition to watching for changes, we also trigger a sync every few seconds (30 seconds, by default) - // This will retry any failed uploads/downloads, in particular after the app was offline - setInterval(() => this.trigger(), this.options.syncInterval); - } - } - - trigger() { - this.uploadRecords(); - this.downloadRecords(); - this.expireCache(); - } - - async watchAttachmentIds() { - this.onAttachmentIdsChange(async (ids) => { - const _ids = `${ids.map((id) => `'${id}'`).join(',')}`; - this.logger.debug(`Queuing for sync, attachment IDs: [${_ids}]`); - - if (this.initialSync) { - this.initialSync = false; - // Mark AttachmentIds for sync - await this.powersync.execute( - `UPDATE - ${this.table} - SET state = ${AttachmentState.QUEUED_SYNC} - WHERE - state < ${AttachmentState.SYNCED} - AND - id IN (${_ids})` - ); - } - - const attachmentsInDatabase = await this.powersync.getAll( - `SELECT * FROM ${this.table} WHERE state < ${AttachmentState.ARCHIVED}` - ); - - for (const id of ids) { - const record = attachmentsInDatabase.find((r) => r.id == id); - // 1. ID is not in the database - if (!record) { - const newRecord = await this.newAttachmentRecord({ - id: id, - state: AttachmentState.QUEUED_SYNC - }); - this.logger.debug(`Attachment (${id}) not found in database, creating new record`); - await this.saveToQueue(newRecord); - } else if (record.local_uri == null || !(await this.storage.fileExists(this.getLocalUri(record.local_uri)))) { - // 2. Attachment in database but no local file, mark as queued download - this.logger.debug(`Attachment (${id}) found in database but no local file, marking as queued download`); - await this.update({ - ...record, - state: AttachmentState.QUEUED_DOWNLOAD - }); - } - } - - // 3. Attachment in database and not in AttachmentIds, mark as archived - await this.powersync.execute( - `UPDATE ${this.table} - SET state = ${AttachmentState.ARCHIVED} - WHERE - state < ${AttachmentState.ARCHIVED} - AND - id NOT IN (${ids.map((id) => `'${id}'`).join(',')})` - ); - }); - } - - async saveToQueue(record: Omit): Promise { - const updatedRecord: AttachmentRecord = { - ...record, - timestamp: new Date().getTime() - }; - - await this.powersync.execute( - `INSERT OR REPLACE INTO ${this.table} (id, timestamp, filename, local_uri, media_type, size, state) VALUES (?, ?, ?, ?, ?, ?, ?)`, - [ - updatedRecord.id, - updatedRecord.timestamp, - updatedRecord.filename, - updatedRecord.local_uri || null, - updatedRecord.media_type || null, - updatedRecord.size || null, - updatedRecord.state - ] - ); - - return updatedRecord; - } - - async record(id: string): Promise { - return this.powersync.getOptional(`SELECT * FROM ${this.table} WHERE id = ?`, [id]); - } - - async update(record: Omit): Promise { - const timestamp = new Date().getTime(); - await this.powersync.execute( - `UPDATE ${this.table} - SET - timestamp = ?, - filename = ?, - local_uri = ?, - size = ?, - media_type = ?, - state = ? - WHERE id = ?`, - [timestamp, record.filename, record.local_uri || null, record.size, record.media_type, record.state, record.id] - ); - } - - async delete(record: AttachmentRecord, tx?: Transaction): Promise { - const deleteRecord = async (tx: Transaction) => { - await tx.execute( - `DELETE - FROM ${this.table} - WHERE id = ?`, - [record.id] - ); - }; - - if (tx) { - await deleteRecord(tx); - } else { - await this.powersync.writeTransaction(deleteRecord); - } - - const localFilePathUri = this.getLocalUri(record.local_uri || this.getLocalFilePathSuffix(record.filename)); - - try { - // Delete file on storage - await this.storage.deleteFile(localFilePathUri, { - filename: record.filename - }); - } catch (e) { - this.logger.error(e); - } - } - - async getNextUploadRecord(): Promise { - return this.powersync.getOptional( - `SELECT * - FROM ${this.table} - WHERE - local_uri IS NOT NULL - AND - (state = ${AttachmentState.QUEUED_UPLOAD} - OR - state = ${AttachmentState.QUEUED_SYNC}) - ORDER BY timestamp ASC` - ); - } - - async uploadAttachment(record: AttachmentRecord) { - if (!record.local_uri) { - throw new Error(`No local_uri for record ${JSON.stringify(record, null, 2)}`); - } - - const localFilePathUri = this.getLocalUri(record.local_uri); - try { - if (!(await this.storage.fileExists(localFilePathUri))) { - this.logger.warn(`File for ${record.id} does not exist, skipping upload`); - await this.update({ - ...record, - state: AttachmentState.QUEUED_DOWNLOAD - }); - return true; - } - - const fileBuffer = await this.storage.readFile(localFilePathUri, { - encoding: EncodingType.Base64, - mediaType: record.media_type - }); - - await this.storage.uploadFile(record.filename, fileBuffer, { - mediaType: record.media_type - }); - // Mark as uploaded - await this.update({ ...record, state: AttachmentState.SYNCED }); - this.logger.debug(`Uploaded attachment "${record.id}" to Cloud Storage`); - return true; - } catch (e: any) { - if (e.error == 'Duplicate') { - this.logger.debug(`File already uploaded, marking ${record.id} as synced`); - await this.update({ ...record, state: AttachmentState.SYNCED }); - return false; - } - if (this.options.onUploadError) { - const { retry } = await this.options.onUploadError(record, e); - if (!retry) { - await this.update({ ...record, state: AttachmentState.ARCHIVED }); - return true; - } - } - this.logger.error(`UploadAttachment error for record ${JSON.stringify(record, null, 2)}`); - return false; - } - } - - async downloadRecord(record: AttachmentRecord) { - if (!this.options.downloadAttachments) { - return false; - } - if (!record.local_uri) { - record.local_uri = this.getLocalFilePathSuffix(record.filename); - } - const localFilePathUri = this.getLocalUri(record.local_uri); - if (await this.storage.fileExists(localFilePathUri)) { - this.logger.debug(`Local file already downloaded, marking "${record.id}" as synced`); - await this.update({ ...record, state: AttachmentState.SYNCED }); - return true; - } - - try { - const fileBlob = await this.storage.downloadFile(record.filename); - - // Convert the blob data into a base64 string - const base64Data = await new Promise((resolve, reject) => { - const reader = new FileReader(); - reader.onloadend = () => { - // remove the header from the result: 'data:*/*;base64,' - resolve(reader.result?.toString().replace(/^data:.+;base64,/, '') || ''); - }; - reader.onerror = reject; - reader.readAsDataURL(fileBlob); - }); - - // Ensure directory exists - await this.storage.makeDir(localFilePathUri.replace(record.filename, '')); - // Write the file - await this.storage.writeFile(localFilePathUri, base64Data, { - encoding: EncodingType.Base64 - }); - - await this.update({ - ...record, - media_type: fileBlob.type, - state: AttachmentState.SYNCED - }); - this.logger.debug(`Downloaded attachment "${record.id}"`); - return true; - } catch (e) { - if (this.options.onDownloadError) { - const { retry } = await this.options.onDownloadError(record, e); - if (!retry) { - await this.update({ ...record, state: AttachmentState.ARCHIVED }); - return true; - } - } - this.logger.error(`Download attachment error for record ${JSON.stringify(record, null, 2)}`, e); - } - return false; - } - - idsToUpload(onResult: (ids: string[]) => void): void { - this.powersync.watch( - `SELECT id - FROM ${this.table} - WHERE - local_uri IS NOT NULL - AND - (state = ${AttachmentState.QUEUED_UPLOAD} - OR - state = ${AttachmentState.QUEUED_SYNC})`, - [], - { onResult: (result) => onResult(result.rows?._array.map((r) => r.id) || []) } - ); - } - - watchUploads() { - this.idsToUpload(async (ids) => { - if (ids.length > 0) { - await this.uploadRecords(); - } - }); - } - - /** - * Returns immediately if another loop is in progress. - */ - private async uploadRecords() { - if (this.uploading) { - return; - } - this.uploading = true; - try { - let record = await this.getNextUploadRecord(); - if (!record) { - return; - } - this.logger.debug(`Uploading attachments...`); - while (record) { - const uploaded = await this.uploadAttachment(record); - if (!uploaded) { - // Then attachment failed to upload. We try all uploads when the next trigger() is called - break; - } - record = await this.getNextUploadRecord(); - } - this.logger.debug('Finished uploading attachments'); - } catch (error) { - this.logger.error('Upload failed:', error); - } finally { - this.uploading = false; - } - } - - async getIdsToDownload(): Promise { - const res = await this.powersync.getAll<{ id: string }>( - `SELECT id - FROM ${this.table} - WHERE - state = ${AttachmentState.QUEUED_DOWNLOAD} - OR - state = ${AttachmentState.QUEUED_SYNC} - ORDER BY timestamp ASC` - ); - return res.map((r) => r.id); - } - - idsToDownload(onResult: (ids: string[]) => void): void { - this.powersync.watch( - `SELECT id - FROM ${this.table} - WHERE - state = ${AttachmentState.QUEUED_DOWNLOAD} - OR - state = ${AttachmentState.QUEUED_SYNC}`, - [], - { onResult: (result) => onResult(result.rows?._array.map((r) => r.id) || []) } - ); - } - - watchDownloads() { - if (!this.options.downloadAttachments) { - return; - } - this.idsToDownload(async (ids) => { - ids.map((id) => this.downloadQueue.add(id)); - // No need to await this, the lock will ensure only one loop is running at a time - this.downloadRecords(); - }); - } - - private async downloadRecords() { - if (!this.options.downloadAttachments) { - return; - } - if (this.downloading) { - return; - } - (await this.getIdsToDownload()).map((id) => this.downloadQueue.add(id)); - if (this.downloadQueue.size == 0) { - return; - } - - this.downloading = true; - try { - this.logger.debug(`Downloading ${this.downloadQueue.size} attachments...`); - while (this.downloadQueue.size > 0) { - const id = this.downloadQueue.values().next().value; - this.downloadQueue.delete(id); - const record = await this.record(id); - if (!record) { - continue; - } - await this.downloadRecord(record); - } - this.logger.debug('Finished downloading attachments'); - } catch (e) { - this.logger.error('Downloads failed:', e); - } finally { - this.downloading = false; - } - } - - /** - * Returns the local file path for the given filename, used to store in the database. - * Example: filename: "attachment-1.jpg" returns "attachments/attachment-1.jpg" - */ - getLocalFilePathSuffix(filename: string): string { - return `${this.options.attachmentDirectoryName}/${filename}`; - } - - /** - * Return users storage directory with the attachmentPath use to load the file. - * Example: filePath: "attachments/attachment-1.jpg" returns "/var/mobile/Containers/Data/Application/.../Library/attachments/attachment-1.jpg" - */ - getLocalUri(filePath: string): string { - return `${this.storage.getUserStorageDirectory()}/${filePath}`; - } - - /** - * Returns the directory where attachments are stored on the device, used to make dir - * Example: "/var/mobile/Containers/Data/Application/.../Library/attachments/" - */ - get storageDirectory() { - return `${this.storage.getUserStorageDirectory()}${this.options.attachmentDirectoryName}`; - } - - async expireCache() { - const res = await this.powersync.getAll(`SELECT * FROM ${this.table} - WHERE - state = ${AttachmentState.SYNCED} OR state = ${AttachmentState.ARCHIVED} - ORDER BY - timestamp DESC - LIMIT 100 OFFSET ${this.options.cacheLimit}`); - - if (res.length == 0) { - return; - } - - this.logger.debug(`Deleting ${res.length} attachments from cache...`); - await this.powersync.writeTransaction(async (tx) => { - for (const record of res) { - await this.delete(record, tx); - } - }); - } - - async clearQueue(): Promise { - this.logger.debug(`Clearing attachment queue...`); - await this.powersync.writeTransaction(async (tx) => { - await tx.execute(`DELETE FROM ${this.table}`); - }); - } -} diff --git a/packages/attachments/src/AttachmentContext.ts b/packages/attachments/src/AttachmentContext.ts new file mode 100644 index 000000000..e6d13cea8 --- /dev/null +++ b/packages/attachments/src/AttachmentContext.ts @@ -0,0 +1,159 @@ +import { AbstractPowerSyncDatabase, ILogger, Transaction } from '@powersync/common'; +import { AttachmentRecord, AttachmentState, attachmentFromSql } from './Schema.js'; + +export class AttachmentContext { + db: AbstractPowerSyncDatabase; + tableName: string; + logger: ILogger; + + constructor(db: AbstractPowerSyncDatabase, tableName: string = 'attachments', logger: ILogger) { + this.db = db; + this.tableName = tableName; + this.logger = logger; + } + + watchActiveAttachments(onUpdate: () => void): AbortController { + const abortController = new AbortController(); + this.db.watchWithCallback( + /* sql */ + ` + SELECT + * + FROM + ${this.tableName} + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + `, + [AttachmentState.QUEUED_UPLOAD, AttachmentState.QUEUED_DOWNLOAD, AttachmentState.QUEUED_DELETE], + { + onResult: () => { + onUpdate(); + } + }, + { + signal: abortController.signal + } + ); + + return abortController; + } + + async getActiveAttachments(): Promise { + const attachments = await this.db.getAll( + /* sql */ + ` + SELECT + * + FROM + ${this.tableName} + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + `, + [AttachmentState.QUEUED_UPLOAD, AttachmentState.QUEUED_DOWNLOAD, AttachmentState.QUEUED_DELETE] + ); + + return attachments.map(attachmentFromSql); + } + + async getArchivedAttachments(): Promise { + const attachments = await this.db.getAll( + /* sql */ + ` + SELECT + * + FROM + ${this.tableName} + WHERE + state = ? + ORDER BY + timestamp ASC + `, + [AttachmentState.ARCHIVED] + ); + + return attachments.map(attachmentFromSql); + } + + async getAttachments(): Promise { + const attachments = await this.db.getAll( + /* sql */ + ` + SELECT + * + FROM + ${this.tableName} + ORDER BY + timestamp ASC + `, + [] + ); + + return attachments.map(attachmentFromSql); + } + + upsertAttachment(attachment: AttachmentRecord, context: Transaction): void { + context.execute( + /* sql */ + ` + INSERT + OR REPLACE INTO ${this.tableName} ( + id, + filename, + local_uri, + size, + media_type, + timestamp, + state, + has_synced, + meta_data + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + [ + attachment.id, + attachment.filename, + attachment.localUri || null, + attachment.size || null, + attachment.mediaType || null, + attachment.timestamp, + attachment.state, + attachment.hasSynced ? 1 : 0, + attachment.metaData || null + ] + ); + } + + async deleteAttachment(attachmentId: string): Promise { + await this.db.writeTransaction((tx) => + tx.execute( + /* sql */ + ` + DELETE FROM ${this.tableName} + WHERE + id = ? + `, + [attachmentId] + ) + ); + } + + async saveAttachments(attachments: AttachmentRecord[]): Promise { + if (attachments.length === 0) { + return; + } + await this.db.writeTransaction(async (tx) => { + for (const attachment of attachments) { + this.upsertAttachment(attachment, tx); + } + }); + } +} diff --git a/packages/attachments/src/AttachmentQueue.ts b/packages/attachments/src/AttachmentQueue.ts new file mode 100644 index 000000000..ea008b5b4 --- /dev/null +++ b/packages/attachments/src/AttachmentQueue.ts @@ -0,0 +1,250 @@ +import { AbstractPowerSyncDatabase, ILogger } from '@powersync/common'; +import { AttachmentContext } from './AttachmentContext.js'; +import { LocalStorageAdapter } from './LocalStorageAdapter.js'; +import { RemoteStorageAdapter } from './RemoteStorageAdapter.js'; +import { ATTACHMENT_TABLE, AttachmentRecord, AttachmentState } from './Schema.js'; +import { StorageService } from './StorageService.js'; +import { WatchedAttachmentItem } from './WatchedAttachmentItem.js'; + +export class AttachmentQueue { + periodicSyncTimer?: ReturnType; + context: AttachmentContext; + storageService: StorageService; + localStorage: LocalStorageAdapter; + remoteStorage: RemoteStorageAdapter; + attachmentsDirectory?: string; + tableName?: string; + logger?: ILogger; + syncInterval: number = 30 * 1000; + syncThrottleDuration: number; + downloadAttachments: boolean = true; + watchActiveAbortController?: AbortController; + archivedCacheLimit: number; + + constructor({ + db, + localStorage, + remoteStorage, + watchAttachments, + logger, + tableName = ATTACHMENT_TABLE, + syncInterval = 30 * 1000, + syncThrottleDuration = 1000, + downloadAttachments = true, + archivedCacheLimit = 100 + }: { + db: AbstractPowerSyncDatabase; + remoteStorage: RemoteStorageAdapter; + localStorage: LocalStorageAdapter; + watchAttachments: (onUpdate: (attachement: WatchedAttachmentItem[]) => void) => void; + tableName?: string; + logger?: ILogger; + syncInterval?: number; + syncThrottleDuration?: number; + downloadAttachments?: boolean; + archivedCacheLimit?: number; + }) { + this.context = new AttachmentContext(db, tableName, logger ?? db.logger); + this.remoteStorage = remoteStorage; + this.localStorage = localStorage; + this.watchAttachments = watchAttachments; + this.tableName = tableName; + this.storageService = new StorageService(this.context, localStorage, remoteStorage, logger ?? db.logger); + this.syncInterval = syncInterval; + this.syncThrottleDuration = syncThrottleDuration; + this.downloadAttachments = downloadAttachments; + this.archivedCacheLimit = archivedCacheLimit; + } + + watchAttachments(onUpdate: (attachement: WatchedAttachmentItem[]) => void): void { + throw new Error('watchAttachments not implemented'); + } + + async startSync(): Promise { + await this.stopSync(); + + // Sync storage periodically + this.periodicSyncTimer = setInterval(async () => { + await this.syncStorage(); + }, this.syncInterval); + + // Sync storage when there is a change in active attachments + this.watchActiveAbortController = this.context.watchActiveAttachments(async () => { + await this.syncStorage(); + }); + + // Process attachments when there is a change in watched attachments + this.watchAttachments(async (watchedAttachments) => { + // Need to get all the attachments which are tracked in the DB. + // We might need to restore an archived attachment. + const currentAttachments = await this.context.getAttachments(); + const attachmentUpdates: AttachmentRecord[] = []; + + for (const watchedAttachment of watchedAttachments) { + const existingQueueItem = currentAttachments.find((a) => a.id === watchedAttachment.id); + if (!existingQueueItem) { + // Item is watched but not in the queue yet. Need to add it. + + if (!this.downloadAttachments) { + continue; + } + + const filename = `${watchedAttachment.id}.${watchedAttachment.fileExtension}`; + + attachmentUpdates.push({ + id: watchedAttachment.id, + filename, + state: AttachmentState.QUEUED_DOWNLOAD, + hasSynced: false, + metaData: watchedAttachment.metaData + }); + continue; + } + + if (existingQueueItem.state === AttachmentState.ARCHIVED) { + // The attachment is present again. Need to queue it for sync. + // We might be able to optimize this in future + if (existingQueueItem.hasSynced === true) { + // No remote action required, we can restore the record (avoids deletion) + attachmentUpdates.push({ + ...existingQueueItem, + state: AttachmentState.SYNCED + }); + } else { + // The localURI should be set if the record was meant to be downloaded + // and hasSynced is false then + // it must be an upload operation + const newState = + existingQueueItem.localUri == null ? AttachmentState.QUEUED_DOWNLOAD : AttachmentState.QUEUED_UPLOAD; + + attachmentUpdates.push({ + ...existingQueueItem, + state: newState + }); + } + } + } + + for (const attachment of currentAttachments) { + const notInWatchedItems = watchedAttachments.find((i) => i.id === attachment.id) == null; + if (notInWatchedItems) { + switch (attachment.state) { + case AttachmentState.QUEUED_DELETE: + case AttachmentState.QUEUED_UPLOAD: + // Only archive if it has synced + if (attachment.hasSynced === true) { + attachmentUpdates.push({ + ...attachment, + state: AttachmentState.ARCHIVED + }); + } + break; + default: + // Archive other states such as QUEUED_DOWNLOAD + attachmentUpdates.push({ + ...attachment, + state: AttachmentState.ARCHIVED + }); + } + } + } + + if (attachmentUpdates.length > 0) { + await this.context.saveAttachments(attachmentUpdates); + } + }); + } + + // Sync storage with all active attachments + async syncStorage(): Promise { + const activeAttachments = await this.context.getActiveAttachments(); + await this.localStorage.initialize(); + await this.storageService.processAttachments(activeAttachments); + await this.storageService.deleteArchivedAttachments(); + } + + async stopSync(): Promise { + clearInterval(this.periodicSyncTimer); + this.periodicSyncTimer = undefined; + this.watchActiveAbortController?.abort(); + } + + async saveFile({ + data, + fileExtension, + mediaType, + metaData, + id + }: { + data: ArrayBuffer | Blob | string; + fileExtension: string; + mediaType?: string; + metaData?: string; + id?: string; + }): Promise { + const resolvedId = id ?? (await this.context.db.get<{ id: string }>('SELECT uuid() as id')).id; + const filename = `${resolvedId}.${fileExtension}`; + const localUri = this.localStorage.getLocalUri(filename); + const size = await this.localStorage.saveFile(localUri, data); + + const attachment: AttachmentRecord = { + id: resolvedId, + filename, + mediaType, + localUri, + state: AttachmentState.QUEUED_UPLOAD, + hasSynced: false, + size, + timestamp: new Date().getTime(), + metaData + }; + + await this.context.db.writeTransaction(async (tx) => { + this.context.upsertAttachment(attachment, tx); + }); + + return attachment; + } + + verifyAttachments = async (): Promise => { + const attachments = await this.context.getAttachments(); + const updates: AttachmentRecord[] = []; + + for (const attachment of attachments) { + if (attachment.localUri == null) { + continue; + } + + const exists = await this.localStorage.fileExists(attachment.localUri); + if (exists) { + // The file exists, this is correct + continue; + } + + const newLocalUri = this.localStorage.getLocalUri(attachment.filename); + const newExists = await this.localStorage.fileExists(newLocalUri); + if (newExists) { + // The file exists but the localUri is broken, lets update it. + updates.push({ + ...attachment, + localUri: newLocalUri + }); + } else if (attachment.state === AttachmentState.QUEUED_UPLOAD || attachment.state === AttachmentState.ARCHIVED) { + // The file must have been removed from the local storage before upload was completed + updates.push({ + ...attachment, + state: AttachmentState.ARCHIVED, + localUri: undefined // Clears the value + }); + } else if (attachment.state === AttachmentState.SYNCED) { + // The file was downloaded, but removed - trigger redownload + updates.push({ + ...attachment, + state: AttachmentState.QUEUED_DOWNLOAD + }); + } + } + + await this.context.saveAttachments(updates); + }; +} diff --git a/packages/attachments/src/LocalStorageAdapter.ts b/packages/attachments/src/LocalStorageAdapter.ts new file mode 100644 index 000000000..73d707c09 --- /dev/null +++ b/packages/attachments/src/LocalStorageAdapter.ts @@ -0,0 +1,65 @@ +export enum EncodingType { + UTF8 = 'utf8', + Base64 = 'base64' +} + +export interface LocalStorageAdapter { + /** + * Saves buffer data to a local file. + * @param filePath Path where the file will be stored + * @param data Data string to store + * @returns Number of bytes written + */ + saveFile(filePath: string, data: ArrayBuffer | Blob | string): Promise; + + /** + * Retrieves an ArrayBuffer with the file data from the given path. + * @param filePath Path where the file is stored + * @returns ArrayBuffer with the file data + */ + readFile(filePath: string): Promise; + + /** + * Deletes the file at the given path. + * @param filePath Path where the file is stored + */ + deleteFile(filePath: string): Promise; + + /** + * Checks if a file exists at the given path. + * @param filePath Path where the file is stored + * @returns True if the file exists, false otherwise + */ + fileExists(filePath: string): Promise; + + /** + * Creates a directory at the specified path. + * @param path The full path to the directory + * @throws PowerSyncAttachmentError if creation fails + */ + makeDir(path: string): Promise; + + /** + * Removes a directory at the specified path. + * @param path The full path to the directory + * @throws PowerSyncAttachmentError if removal fails + */ + rmDir(path: string): Promise; + + /** + * Initializes the storage adapter (e.g., creating necessary directories). + */ + initialize(): Promise; + + /** + * Clears all files in the storage. + */ + clear(): Promise; + + /** + * Returns the file path of the provided filename in the user storage directory. + * @param filename The filename to get the path for + * @returns The full file path + */ + getLocalUri(filename: string): string; +} diff --git a/packages/attachments/src/RemoteStorageAdapter.ts b/packages/attachments/src/RemoteStorageAdapter.ts new file mode 100644 index 000000000..853aac4d0 --- /dev/null +++ b/packages/attachments/src/RemoteStorageAdapter.ts @@ -0,0 +1,27 @@ +import { AttachmentRecord } from "./Schema.js"; + +export interface RemoteStorageAdapter { + /** + * Uploads a file to remote storage. + * + * @param fileData The binary content of the file to upload. + * @param attachment The associated `Attachment` metadata describing the file. + * @throws An error if the upload fails. + */ + uploadFile(fileData: ArrayBuffer, attachment: AttachmentRecord): Promise; + /** + * Downloads a file from remote storage. + * + * @param attachment The `Attachment` describing the file to download. + * @returns The binary data of the downloaded file. + * @throws An error if the download fails or the file is not found. + */ + downloadFile(attachment: AttachmentRecord): Promise; + /** + * Deletes a file from remote storage. + * + * @param attachment The `Attachment` describing the file to delete. + * @throws An error if the deletion fails or the file does not exist. + */ + deleteFile(attachment: AttachmentRecord): Promise; +} \ No newline at end of file diff --git a/packages/attachments/src/Schema.ts b/packages/attachments/src/Schema.ts index 93811c1f4..b295cdfcf 100644 --- a/packages/attachments/src/Schema.ts +++ b/packages/attachments/src/Schema.ts @@ -1,46 +1,64 @@ -import { Column, ColumnType, Table, TableOptions } from '@powersync/common'; +import { column, Table, TableV2Options } from '@powersync/common'; export const ATTACHMENT_TABLE = 'attachments'; export interface AttachmentRecord { id: string; filename: string; - local_uri?: string; + localUri?: string; size?: number; - media_type?: string; + mediaType?: string; timestamp?: number; + metaData?: string; + hasSynced?: boolean; state: AttachmentState; } +// map from db to record +export function attachmentFromSql(row: any): AttachmentRecord { + return { + id: row.id, + filename: row.filename, + localUri: row.local_uri, + size: row.size, + mediaType: row.media_type, + timestamp: row.timestamp, + metaData: row.meta_data, + hasSynced: row.has_synced === 1, + state: row.state + }; +} + export enum AttachmentState { QUEUED_SYNC = 0, // Check if the attachment needs to be uploaded or downloaded QUEUED_UPLOAD = 1, // Attachment to be uploaded QUEUED_DOWNLOAD = 2, // Attachment to be downloaded - SYNCED = 3, // Attachment has been synced - ARCHIVED = 4 // Attachment has been orphaned, i.e. the associated record has been deleted + QUEUED_DELETE = 3, // Attachment to be deleted + SYNCED = 4, // Attachment has been synced + ARCHIVED = 5 // Attachment has been orphaned, i.e. the associated record has been deleted } -export interface AttachmentTableOptions extends Omit { - name?: string; - additionalColumns?: Column[]; -} +export interface AttachmentTableOptions extends Omit {} export class AttachmentTable extends Table { constructor(options?: AttachmentTableOptions) { - super({ - ...options, - name: options?.name ?? ATTACHMENT_TABLE, - localOnly: true, - insertOnly: false, - columns: [ - new Column({ name: 'filename', type: ColumnType.TEXT }), - new Column({ name: 'local_uri', type: ColumnType.TEXT }), - new Column({ name: 'timestamp', type: ColumnType.INTEGER }), - new Column({ name: 'size', type: ColumnType.INTEGER }), - new Column({ name: 'media_type', type: ColumnType.TEXT }), - new Column({ name: 'state', type: ColumnType.INTEGER }), // Corresponds to AttachmentState - ...(options?.additionalColumns ?? []) - ] - }); + super( + { + filename: column.text, + local_uri: column.text, + timestamp: column.integer, + size: column.integer, + media_type: column.text, + state: column.integer, // Corresponds to AttachmentState + has_synced: column.integer, + meta_data: column.text + }, + { + ...options, + viewName: options?.viewName ?? ATTACHMENT_TABLE, + localOnly: true, + insertOnly: false + } + ); } } diff --git a/packages/attachments/src/StorageAdapter.ts b/packages/attachments/src/StorageAdapter.ts deleted file mode 100644 index 96ed3ec01..000000000 --- a/packages/attachments/src/StorageAdapter.ts +++ /dev/null @@ -1,28 +0,0 @@ -export enum EncodingType { - UTF8 = 'utf8', - Base64 = 'base64' -} - -export interface StorageAdapter { - uploadFile(filePath: string, data: ArrayBuffer, options?: { mediaType?: string }): Promise; - - downloadFile(filePath: string): Promise; - - writeFile(fileUri: string, base64Data: string, options?: { encoding?: EncodingType }): Promise; - - readFile(fileUri: string, options?: { encoding?: EncodingType; mediaType?: string }): Promise; - - deleteFile(uri: string, options?: { filename?: string }): Promise; - - fileExists(fileUri: string): Promise; - - makeDir(uri: string): Promise; - - copyFile(sourceUri: string, targetUri: string): Promise; - - /** - * Returns the directory where user data is stored. - * Should end with a '/' - */ - getUserStorageDirectory(): string; -} diff --git a/packages/attachments/src/StorageService.ts b/packages/attachments/src/StorageService.ts new file mode 100644 index 000000000..ede346239 --- /dev/null +++ b/packages/attachments/src/StorageService.ts @@ -0,0 +1,157 @@ +import { ILogger } from '@powersync/common'; +import { AttachmentContext } from './AttachmentContext.js'; +import { EncodingType, LocalStorageAdapter } from './LocalStorageAdapter.js'; +import { RemoteStorageAdapter } from './RemoteStorageAdapter.js'; +import { AttachmentRecord, AttachmentState } from './Schema.js'; +import { SyncErrorHandler } from './SyncErrorHandler.js'; + +export class StorageService { + context: AttachmentContext; + localStorage: LocalStorageAdapter; + remoteStorage: RemoteStorageAdapter; + logger: ILogger; + errorHandler?: SyncErrorHandler; + + constructor( + context: AttachmentContext, + localStorage: LocalStorageAdapter, + remoteStorage: RemoteStorageAdapter, + logger: ILogger, + errorHandler?: SyncErrorHandler + ) { + this.context = context; + this.localStorage = localStorage; + this.remoteStorage = remoteStorage; + this.logger = logger; + this.errorHandler = errorHandler; + } + + async processAttachments(attachments: AttachmentRecord[]): Promise { + const updatedAttachments: AttachmentRecord[] = []; + for (const attachment of attachments) { + switch (attachment.state) { + case AttachmentState.QUEUED_UPLOAD: + const uploaded = await this.uploadAttachment(attachment); + updatedAttachments.push(uploaded); + break; + case AttachmentState.QUEUED_DOWNLOAD: + const downloaded = await this.downloadAttachment(attachment); + updatedAttachments.push(downloaded); + break; + case AttachmentState.QUEUED_DELETE: + const deleted = await this.deleteAttachment(attachment); + updatedAttachments.push(deleted); + break; + + default: + break; + } + } + + await this.context.saveAttachments(updatedAttachments); + } + + async uploadAttachment(attachment: AttachmentRecord): Promise { + this.logger.info(`Uploading attachment ${attachment.filename}`); + try { + if (attachment.localUri == null) { + throw new Error(`No localUri for attachment ${attachment.id}`); + } + + const fileBlob = await this.localStorage.readFile(attachment.localUri); + await this.remoteStorage.uploadFile(fileBlob, attachment); + + return { + ...attachment, + state: AttachmentState.SYNCED, + hasSynced: true + }; + } catch (error) { + const shouldRetry = this.errorHandler?.onUploadError(attachment, error) ?? false; + if (!shouldRetry) { + return { + ...attachment, + state: AttachmentState.ARCHIVED + }; + } + + return attachment; + } + } + + async downloadAttachment(attachment: AttachmentRecord): Promise { + try { + const fileBlob = await this.remoteStorage.downloadFile(attachment); + + const base64Data = await new Promise((resolve, reject) => { + const reader = new FileReader(); + reader.onloadend = () => { + // remove the header from the result: 'data:*/*;base64,' + resolve(reader.result?.toString().replace(/^data:.+;base64,/, '') || ''); + }; + reader.onerror = reject; + reader.readAsDataURL(fileBlob); + }); + + const localUri = this.localStorage.getLocalUri(attachment.filename); + await this.localStorage.saveFile(localUri, base64Data); + + return { + ...attachment, + state: AttachmentState.SYNCED, + localUri: localUri + }; + } catch (error) { + const shouldRetry = this.errorHandler?.onDownloadError(attachment, error) ?? false; + if (!shouldRetry) { + return { + ...attachment, + state: AttachmentState.ARCHIVED + }; + } + + return attachment; + } + } + + async deleteAttachment(attachment: AttachmentRecord): Promise { + try { + await this.remoteStorage.deleteFile(attachment); + if (attachment.localUri) { + await this.localStorage.deleteFile(attachment.localUri); + } + + await this.context.deleteAttachment(attachment.id); + + return { + ...attachment, + state: AttachmentState.QUEUED_DELETE, + localUri: null + }; + } catch (error) { + const shouldRetry = this.errorHandler?.onDeleteError(attachment, error) ?? false; + if (!shouldRetry) { + return { + ...attachment, + state: AttachmentState.ARCHIVED + }; + } + + return attachment; + } + } + + async deleteArchivedAttachments(): Promise { + const archivedAttachments = await this.context.getArchivedAttachments(); + for (const attachment of archivedAttachments) { + if (attachment.localUri) { + try { + await this.localStorage.deleteFile(attachment.localUri); + } catch (error) { + this.logger.error('Error deleting local file for archived attachment', error); + } + } + await this.context.deleteAttachment(attachment.id); + } + } +} diff --git a/packages/attachments/src/SyncErrorHandler.ts b/packages/attachments/src/SyncErrorHandler.ts new file mode 100644 index 000000000..d71ea7db1 --- /dev/null +++ b/packages/attachments/src/SyncErrorHandler.ts @@ -0,0 +1,28 @@ +import { AttachmentRecord } from './Schema.js'; + +/// If an operation fails and should not be retried, the attachment record is archived. +export abstract class SyncErrorHandler { + /** + * Handles a download error for a specific attachment. + * @param attachment The `Attachment` that failed to be downloaded. + * @param error The error encountered during the download operation. + * @returns `true` if the operation should be retried, `false` if it should be archived. + */ + abstract onDownloadError(attachment: AttachmentRecord, error: Error): Promise; + + /** + * Handles an upload error for a specific attachment. + * @param attachment The `Attachment` that failed to be uploaded. + * @param error The error encountered during the upload operation. + * @returns `true` if the operation should be retried, `false` if it should be archived. + */ + abstract onUploadError(attachment: AttachmentRecord, error: Error): Promise; + + /** + * Handles a delete error for a specific attachment. + * @param attachment The `Attachment` that failed to be deleted. + * @param error The error encountered during the delete operation. + * @returns `true` if the operation should be retried, `false` if it should be archived. + */ + abstract onDeleteError(attachment: AttachmentRecord, error: Error): Promise; +} diff --git a/packages/attachments/src/WatchedAttachmentItem.ts b/packages/attachments/src/WatchedAttachmentItem.ts new file mode 100644 index 000000000..70fefea44 --- /dev/null +++ b/packages/attachments/src/WatchedAttachmentItem.ts @@ -0,0 +1,14 @@ +// A watched attachment record item. +export type WatchedAttachmentItem = + | { + id: string; + filename: string; + fileExtension?: never; + metaData?: string; + } + | { + id: string; + fileExtension: string; + filename?: never; + metaData?: string; + }; diff --git a/packages/attachments/src/index.ts b/packages/attachments/src/index.ts index 04eee58ed..7174fa7ca 100644 --- a/packages/attachments/src/index.ts +++ b/packages/attachments/src/index.ts @@ -1,4 +1,8 @@ export * from './Schema.js'; -export * from './StorageAdapter.js'; - -export * from './AbstractAttachmentQueue.js'; +export * from './LocalStorageAdapter.js'; +export * from './storageAdapters/NodeFileSystemAdapter.js'; +export * from './storageAdapters/IndexDBFileSystemAdapter.js'; +export * from './RemoteStorageAdapter.js'; +export * from './AttachmentContext.js'; +export * from './StorageService.js'; +export * from './AttachmentQueue.js'; diff --git a/packages/attachments/src/storageAdapters/IndexDBFileSystemAdapter.ts b/packages/attachments/src/storageAdapters/IndexDBFileSystemAdapter.ts new file mode 100644 index 000000000..7de9b1aa0 --- /dev/null +++ b/packages/attachments/src/storageAdapters/IndexDBFileSystemAdapter.ts @@ -0,0 +1,122 @@ +import { EncodingType, LocalStorageAdapter } from '../LocalStorageAdapter.js'; + +export class IndexDBFileSystemStorageAdapter implements LocalStorageAdapter { + private dbPromise: Promise; + + async initialize(): Promise { + this.dbPromise = new Promise((resolve, reject) => { + const request = indexedDB.open('PowerSyncFiles', 1); + request.onupgradeneeded = () => { + request.result.createObjectStore('files'); + }; + request.onsuccess = () => resolve(request.result); + request.onerror = () => reject(request.error); + }); + } + + clear(): Promise { + return new Promise(async (resolve, reject) => { + const db = await this.dbPromise; + const tx = db.transaction('files', 'readwrite'); + const store = tx.objectStore('files'); + const req = store.clear(); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); + } + + getLocalUri(filename: string): string { + return `indexeddb://PowerSyncFiles/files/${filename}`; + } + + private async getStore(mode: IDBTransactionMode = 'readonly'): Promise { + const db = await this.dbPromise; + const tx = db.transaction('files', mode); + return tx.objectStore('files'); + } + + async saveFile(filePath: string, data: string): Promise { + const store = await this.getStore('readwrite'); + return await new Promise((resolve, reject) => { + const req = store.put(data, filePath); + req.onsuccess = () => resolve(data.length); + req.onerror = () => reject(req.error); + }); + } + + async downloadFile(filePath: string): Promise { + const store = await this.getStore(); + return new Promise((resolve, reject) => { + const req = store.get(filePath); + req.onsuccess = () => { + if (req.result) { + resolve(new Blob([req.result])); + } else { + reject(new Error('File not found')); + } + }; + req.onerror = () => reject(req.error); + }); + } + + async readFile(fileUri: string, options?: { encoding?: EncodingType; mediaType?: string }): Promise { + const store = await this.getStore(); + return new Promise((resolve, reject) => { + const req = store.get(fileUri); + req.onsuccess = async () => { + if (!req.result) { + reject(new Error('File not found')); + return; + } + + // if (options?.encoding === EncodingType.Base64) { + // } + + if (options?.encoding === EncodingType.UTF8) { + const encoder = new TextEncoder(); + const arrayBuffer = encoder.encode(req.result).buffer; + resolve(arrayBuffer); + } + + // Default base64 encoding + const base64String = req.result.replace(/^data:\w+;base64,/, ''); + const binaryString = atob(base64String); + const len = binaryString.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + resolve(bytes.buffer); + + // reject(new Error('Unsupported encoding')); + }; + req.onerror = () => reject(req.error); + }); + } + + async deleteFile(uri: string, options?: { filename?: string }): Promise { + const store = await this.getStore('readwrite'); + await new Promise((resolve, reject) => { + const req = store.delete(uri); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); + } + + async fileExists(fileUri: string): Promise { + const store = await this.getStore(); + return new Promise((resolve, reject) => { + const req = store.get(fileUri); + req.onsuccess = () => resolve(!!req.result); + req.onerror = () => reject(req.error); + }); + } + + async makeDir(path: string): Promise { + // No-op for IndexedDB + } + + async rmDir(path: string): Promise { + // No-op for IndexedDB + } +} diff --git a/packages/attachments/src/storageAdapters/NodeFileSystemAdapter.ts b/packages/attachments/src/storageAdapters/NodeFileSystemAdapter.ts new file mode 100644 index 000000000..9ce26ea67 --- /dev/null +++ b/packages/attachments/src/storageAdapters/NodeFileSystemAdapter.ts @@ -0,0 +1,79 @@ +import { promises as fs } from 'fs'; +import * as path from 'path'; +import { EncodingType, LocalStorageAdapter } from '../LocalStorageAdapter.js'; + +export class NodeFileSystemAdapter implements LocalStorageAdapter { + async initialize(): Promise { + // const dir = this.getUserStorageDirectory(); + const dir = path.resolve('./user_data'); + await fs.mkdir(dir, { recursive: true }); + } + + async clear(): Promise { + // const dir = this.getUserStorageDirectory(); + const dir = path.resolve('./user_data'); + await fs.rmdir(dir, { recursive: true }); + } + + getLocalUri(filename: string): string { + return path.join(path.resolve('./user_data'), filename); + } + + async uploadFile(filePath: string, data: ArrayBuffer, options?: { encoding: EncodingType }): Promise { + const buffer = Buffer.from(data); + await fs.writeFile(filePath, buffer, { + encoding: options.encoding + }); + } + + async downloadFile(filePath: string): Promise { + const data = await fs.readFile(filePath); + return new Blob([new Uint8Array(data)]); + } + + async saveFile( + filePath: string, + data: string, + options?: { encoding?: EncodingType; mediaType?: string } + ): Promise { + const buffer = options?.encoding === EncodingType.Base64 ? Buffer.from(data, 'base64') : Buffer.from(data, 'utf8'); + await fs.writeFile(filePath, buffer, { + encoding: options?.encoding + }); + return buffer.length; + } + + async readFile(filePath: string, options?: { encoding?: EncodingType; mediaType?: string }): Promise { + const data = await fs.readFile(filePath); + if (options?.encoding === EncodingType.Base64) { + return Buffer.from(data.toString(), 'base64').buffer; + } else { + return data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength) as ArrayBuffer; + } + } + + async deleteFile(path: string, options?: { filename?: string }): Promise { + await fs.unlink(path).catch((err) => { + if (err.code !== 'ENOENT') { + throw err; + } + }); + } + + async fileExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch { + return false; + } + } + + async makeDir(path: string): Promise { + await fs.mkdir(path, { recursive: true }); + } + + async rmDir(path: string): Promise { + await fs.rmdir(path, { recursive: true }); + } +} diff --git a/packages/attachments/tests/attachments.test.ts b/packages/attachments/tests/attachments.test.ts new file mode 100644 index 000000000..41f049102 --- /dev/null +++ b/packages/attachments/tests/attachments.test.ts @@ -0,0 +1,202 @@ +import { describe, expect, it, vi } from 'vitest'; +import { PowerSyncDatabase, Schema, Table, column } from '@powersync/web'; +import { AbstractPowerSyncDatabase } from '@powersync/common'; +import { AttachmentQueue } from '../src/AttachmentQueue.js'; +import { AttachmentState, AttachmentTable } from '../src/Schema.js'; +import { RemoteStorageAdapter } from '../src/RemoteStorageAdapter.js'; +import { WatchedAttachmentItem } from '../src/WatchedAttachmentItem.js'; +import { IndexDBFileSystemStorageAdapter } from '../src/storageAdapters/IndexDBFileSystemAdapter.js'; + +const mockRemoteStorage: RemoteStorageAdapter = { + downloadFile: (attachment) => { + return Promise.resolve(new Blob(['_BASE64_DATA'], { type: 'image/jpeg' })); + }, + uploadFile: vi.fn(), + deleteFile: vi.fn() +}; + +const watchAttachments = (onUpdate: (attachments: WatchedAttachmentItem[]) => void) => { + db.watch( + /* sql */ + ` + SELECT + photo_id + FROM + users + WHERE + photo_id IS NOT NULL + `, + [], + { + onResult: (result: any) => + onUpdate( + result.rows?._array.map((r: any) => ({ + id: r.photo_id, + fileExtension: 'jpg' + })) ?? [] + ) + } + ); +}; + +let db: AbstractPowerSyncDatabase; + +beforeAll(async () => { + db = new PowerSyncDatabase({ + schema: new Schema({ + users: new Table({ + name: column.text, + email: column.text, + photo_id: column.text + }), + attachments: new AttachmentTable() + }), + database: { + dbFilename: 'example.db' + } + }); + + await db.disconnectAndClear(); +}); + +afterAll(async () => { + await db.disconnectAndClear(); +}); + +describe('attachment queue', () => { + it('should download attachments when a new record with an attachment is added', async () => { + const queue = new AttachmentQueue({ + db: db, + watchAttachments, + remoteStorage: mockRemoteStorage, + localStorage: new IndexDBFileSystemStorageAdapter(), + }); + + await queue.startSync(); + + await db.execute( + /* sql */ + ` + INSERT INTO + users (id, name, email, photo_id) + VALUES + ( + uuid (), + 'example', + 'example@example.com', + uuid () + ) + `, + [] + ); + + const attachmentRecords = await waitForMatch( + () => + db.watch( + /* sql */ + ` + SELECT + * + FROM + attachments + `, + [] + ), + (results) => { + return results?.rows?._array.some((r: any) => r.state === AttachmentState.SYNCED); + }, + 5 + ); + + const attachmentRecord = attachmentRecords.rows._array.at(0); + + const localData = await queue.localStorage.readFile(attachmentRecord.local_uri!); + const localDataString = new TextDecoder().decode(localData); + expect(localDataString).toBe('_BASE64_DATA'); + + await queue.stopSync(); + }); +}); + +async function waitForMatch( + iteratorGenerator: () => AsyncIterable, + predicate: (value: any) => boolean, + timeout: number +) { + const timeoutMs = timeout * 1000; + const abortController = new AbortController(); + + const matchPromise = (async () => { + const asyncIterable = iteratorGenerator(); + try { + for await (const value of asyncIterable) { + if (abortController.signal.aborted) { + throw new Error('Timeout'); + } + if (predicate(value)) { + return value; + } + } + throw new Error('Stream ended without match'); + } finally { + const iterator = asyncIterable[Symbol.asyncIterator](); + if (iterator.return) { + await iterator.return(); + } + } + })(); + + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => { + abortController.abort(); + reject(new Error('Timeout')); + }, timeoutMs) + ); + + return Promise.race([matchPromise, timeoutPromise]); +} + +// describe('attachments', () => { +// beforeEach(() => { +// vi.clearAllMocks(); +// }); + +// it('should not download attachments when downloadRecord is called with downloadAttachments false', async () => { +// const queue = new AttachmentQueue({ +// db: mockPowerSync as any, +// watchAttachments: watchAttachments, +// remoteStorage: mockRemoteStorage, +// localStorage: mockLocalStorage +// }); + +// await queue.saveFile; + +// expect(mockLocalStorage.downloadFile).not.toHaveBeenCalled(); +// }); + +// it('should download attachments when downloadRecord is called with downloadAttachments true', async () => { +// const queue = new TestAttachmentQueue({ +// powersync: mockPowerSync as any, +// storage: mockLocalStorage, +// downloadAttachments: true +// }); + +// await queue.downloadRecord(record); + +// expect(mockLocalStorage.downloadFile).toHaveBeenCalled(); +// }); + +// // Testing the inverse of this test, i.e. when downloadAttachments is false, is not required as you can't wait for something that does not happen +// it('should not download attachments with watchDownloads is called with downloadAttachments false', async () => { +// const queue = new TestAttachmentQueue({ +// powersync: mockPowerSync as any, +// storage: mockLocalStorage, +// downloadAttachments: true +// }); + +// queue.watchDownloads(); +// await vi.waitFor(() => { +// expect(mockLocalStorage.downloadFile).toBeCalledTimes(2); +// }); +// }); +// }); diff --git a/packages/attachments/tests/attachments/AttachmentQueue.test.ts b/packages/attachments/tests/attachments/AttachmentQueue.test.ts deleted file mode 100644 index 69e8c1593..000000000 --- a/packages/attachments/tests/attachments/AttachmentQueue.test.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { beforeEach, describe, expect, it, vi } from 'vitest'; -import { AbstractAttachmentQueue } from '../../src/AbstractAttachmentQueue.js'; -import { AttachmentRecord, AttachmentState } from '../../src/Schema.js'; -import { StorageAdapter } from '../../src/StorageAdapter.js'; - -const record = { - id: 'test-1', - filename: 'test.jpg', - state: AttachmentState.QUEUED_DOWNLOAD -}; - -const mockPowerSync = { - currentStatus: { status: 'initial' }, - registerListener: vi.fn(() => {}), - resolveTables: vi.fn(() => ['table1', 'table2']), - onChangeWithCallback: vi.fn(), - getAll: vi.fn(() => Promise.resolve([{ id: 'test-1' }, { id: 'test-2' }])), - execute: vi.fn(() => Promise.resolve()), - getOptional: vi.fn((_query, params) => Promise.resolve(record)), - watch: vi.fn((query, params, callbacks) => { - callbacks?.onResult?.({ rows: { _array: [{ id: 'test-1' }, { id: 'test-2' }] } }); - }), - writeTransaction: vi.fn(async (callback) => { - await callback({ - execute: vi.fn(() => Promise.resolve()) - }); - }) -}; - -const mockStorage: StorageAdapter = { - downloadFile: vi.fn(), - uploadFile: vi.fn(), - deleteFile: vi.fn(), - writeFile: vi.fn(), - readFile: vi.fn(), - fileExists: vi.fn(), - makeDir: vi.fn(), - copyFile: vi.fn(), - getUserStorageDirectory: vi.fn() -}; - -class TestAttachmentQueue extends AbstractAttachmentQueue { - onAttachmentIdsChange(onUpdate: (ids: string[]) => void): void { - throw new Error('Method not implemented.'); - } - newAttachmentRecord(record?: Partial): Promise { - throw new Error('Method not implemented.'); - } -} - -describe('attachments', () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it('should not download attachments when downloadRecord is called with downloadAttachments false', async () => { - const queue = new TestAttachmentQueue({ - powersync: mockPowerSync as any, - storage: mockStorage, - downloadAttachments: false - }); - - await queue.downloadRecord(record); - - expect(mockStorage.downloadFile).not.toHaveBeenCalled(); - }); - - it('should download attachments when downloadRecord is called with downloadAttachments true', async () => { - const queue = new TestAttachmentQueue({ - powersync: mockPowerSync as any, - storage: mockStorage, - downloadAttachments: true - }); - - await queue.downloadRecord(record); - - expect(mockStorage.downloadFile).toHaveBeenCalled(); - }); - - // Testing the inverse of this test, i.e. when downloadAttachments is false, is not required as you can't wait for something that does not happen - it('should not download attachments with watchDownloads is called with downloadAttachments false', async () => { - const queue = new TestAttachmentQueue({ - powersync: mockPowerSync as any, - storage: mockStorage, - downloadAttachments: true - }); - - queue.watchDownloads(); - await vi.waitFor(() => { - expect(mockStorage.downloadFile).toBeCalledTimes(2); - }); - }); -}); diff --git a/packages/attachments/tsconfig.json b/packages/attachments/tsconfig.json index 04be1f2f0..9c31f1952 100644 --- a/packages/attachments/tsconfig.json +++ b/packages/attachments/tsconfig.json @@ -13,5 +13,5 @@ "path": "../common" } ], - "include": ["src/**/*"] + "include": ["src/**/*", "tests/**/*", "package.json"] } diff --git a/packages/attachments/vitest.config.ts b/packages/attachments/vitest.config.ts index d8f9fd351..e26633c68 100644 --- a/packages/attachments/vitest.config.ts +++ b/packages/attachments/vitest.config.ts @@ -1,8 +1,19 @@ import topLevelAwait from 'vite-plugin-top-level-await'; +import wasm from 'vite-plugin-wasm'; import { defineConfig, UserConfigExport } from 'vitest/config'; const config: UserConfigExport = { - plugins: [topLevelAwait()], + worker: { + format: 'es', + plugins: () => [wasm(), topLevelAwait()] + }, + optimizeDeps: { + // Don't optimise these packages as they contain web workers and WASM files. + // https://github.com/vitejs/vite/issues/11672#issuecomment-1415820673 + exclude: ['@journeyapps/wa-sqlite', '@powersync/web'], + include: ['async-mutex', 'comlink', 'bson'] + }, + plugins: [wasm(), topLevelAwait()], test: { isolate: false, globals: true,