From a0f39375c052984a65c511bc2d4ae7995cacab16 Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Tue, 30 Jul 2024 13:21:25 -0700 Subject: [PATCH 1/3] Remove InfoClient usage for SyncClient instances --- CHANGELOG.md | 2 ++ src/client/sync-client.ts | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 523816a..0caa299 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- removed: Remove InfoClient usage for SyncClient instances + ## 0.2.9 (2024-02-26) - added: Detect conflicts while creating repos, and report these with a new `ConflictError` type. diff --git a/src/client/sync-client.ts b/src/client/sync-client.ts index e8f47f0..e2f568e 100644 --- a/src/client/sync-client.ts +++ b/src/client/sync-client.ts @@ -17,7 +17,19 @@ import { } from '../types/rest-types' import { syncKeyToRepoId } from '../util/security' import { shuffle } from '../util/shuffle' -import { makeInfoClient } from './info-client' + +const defaultEdgeServers: Required = { + infoServers: ['https://info-eu1.edge.app', 'https://info-us1.edge.app'], + syncServers: [ + 'https://sync-us1.edge.app', + 'https://sync-us2.edge.app', + 'https://sync-us3.edge.app', + 'https://sync-us4.edge.app', + 'https://sync-us5.edge.app', + 'https://sync-us6.edge.app', + 'https://sync-eu.edge.app' + ] +} export interface SyncClient { createRepo: (syncKey: string, apiKey?: string) => Promise @@ -40,11 +52,11 @@ export interface SyncClientOptions { export function makeSyncClient(opts: SyncClientOptions = {}): SyncClient { const { fetch = crossFetch, log = () => {} } = opts - const infoClient = makeInfoClient(opts) + const syncServers: Required['syncServers'] = + opts.edgeServers?.syncServers ?? defaultEdgeServers.syncServers // Returns the sync servers from the info client shuffled async function shuffledSyncServers(): Promise { - const { syncServers } = await infoClient.getEdgeServers() return shuffle(syncServers) } From 6a2a0441cc37845c6b83d70946f14a229bbcda5e Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Tue, 30 Jul 2024 14:00:31 -0700 Subject: [PATCH 2/3] v0.2.9-1 --- CHANGELOG.md | 2 ++ package.json | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0caa299..824b196 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +## 0.2.9-1 (2024-07-30) + - removed: Remove InfoClient usage for SyncClient instances ## 0.2.9 (2024-02-26) diff --git a/package.json b/package.json index f2a7233..0d48bf5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "edge-sync-client", - "version": "0.2.9", + "version": "0.2.9-1", "description": "Library for accessing the Edge data-sync system", "keywords": [ "Edge" From eb2a5cef4957d80f49481e427227a37b29b07928 Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Wed, 3 Dec 2025 16:46:39 -0800 Subject: [PATCH 3/3] Implement Disklet synchronization --- CHANGELOG.md | 2 + package.json | 4 + rollup.config.js | 6 +- src/client/sync-client.ts | 147 +++++++- src/index.flow.js | 55 ++- src/index.ts | 7 +- src/types/base-types.ts | 33 +- test/component/sync-client.test.ts | 573 +++++++++++++++++++++++++++++ test/utils/mock-sync-server.ts | 185 ++++++++++ yarn.lock | 12 + 10 files changed, 997 insertions(+), 27 deletions(-) create mode 100644 test/component/sync-client.test.ts create mode 100644 test/utils/mock-sync-server.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 824b196..df01dab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- added: `syncRepo(disklet, syncKey, lastHash)` method on `SyncClient` for Disklet-based repository synchronization with `changes/`, `deleted/`, and `data/` staging directories + ## 0.2.9-1 (2024-07-30) - removed: Remove InfoClient usage for SyncClient instances diff --git a/package.json b/package.json index 0d48bf5..e981721 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,9 @@ "rfc4648": "^1.5.0", "serverlet": "^0.1.0" }, + "peerDependencies": { + "disklet": ">=0.5.0" + }, "devDependencies": { "@babel/core": "^7.10.4", "@rollup/plugin-node-resolve": "^8.1.0", @@ -71,6 +74,7 @@ "@typescript-eslint/parser": "^4.8.2", "babel-eslint": "^10.1.0", "chai": "^4.2.0", + "disklet": "^0.5.2", "eslint": "^7.14.0", "eslint-config-standard-kit": "0.15.1", "eslint-plugin-flowtype": "^5.2.0", diff --git a/rollup.config.js b/rollup.config.js index 4f349a5..6f6b37d 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -8,7 +8,11 @@ import packageJson from './package.json' const extensions = ['.ts'] export default { - external: [/@babel\/runtime/, ...Object.keys(packageJson.dependencies)], + external: [ + /@babel\/runtime/, + ...Object.keys(packageJson.dependencies), + ...Object.keys(packageJson.peerDependencies || {}) + ], input: 'src/index.ts', output: [ { file: packageJson.main, format: 'cjs' }, diff --git a/src/client/sync-client.ts b/src/client/sync-client.ts index e2f568e..846440a 100644 --- a/src/client/sync-client.ts +++ b/src/client/sync-client.ts @@ -1,8 +1,9 @@ import { asMaybe, Cleaner, uncleaner } from 'cleaners' import crossFetch from 'cross-fetch' +import { Disklet, navigateDisklet } from 'disklet' import { FetchFunction, FetchResponse } from 'serverlet' -import { EdgeServers } from '../types/base-types' +import { asEdgeBox, EdgeServers, wasEdgeBox } from '../types/base-types' import { ConflictError } from '../types/error' import { asGetStoreResponse, @@ -10,6 +11,7 @@ import { asPostStoreResponse, asPutStoreResponse, asServerErrorResponse, + ChangeSet, GetStoreResponse, PostStoreBody, PostStoreResponse, @@ -42,16 +44,42 @@ export interface SyncClient { lastHash: string | undefined, body: PostStoreBody ) => Promise + /** + * Sync the repository using the provided Disklet. + * Gathers changes from `changes/` and deletions from `deleted/` directories, + * sends them to the server, applies server's response to `data/`, and clears staging directories. + */ + syncRepo: ( + disklet: Disklet, + syncKey: string, + lastHash: string | undefined + ) => Promise +} + +export interface SyncResult { + /** The sync status after sync */ + status: SyncStatus + /** The changes received from the server */ + changes: ChangeSet +} + +export interface SyncStatus { + /** The last known hash from the server */ + lastHash: string | undefined + /** Unix timestamp of the last sync (seconds) */ + lastSync: number } export interface SyncClientOptions { fetch?: FetchFunction log?: (message: string) => void edgeServers?: EdgeServers + /** Maximum number of changes to send per sync (default: 100) */ + maxChangesPerSync?: number } export function makeSyncClient(opts: SyncClientOptions = {}): SyncClient { - const { fetch = crossFetch, log = () => {} } = opts + const { fetch = crossFetch, log = () => {}, maxChangesPerSync = 100 } = opts const syncServers: Required['syncServers'] = opts.edgeServers?.syncServers ?? defaultEdgeServers.syncServers @@ -182,6 +210,94 @@ export function makeSyncClient(opts: SyncClientOptions = {}): SyncClient { } throw error + }, + + async syncRepo(disklet, syncKey, lastHash) { + // Get subdisklets for changes, deletions, and data + const changesDisklet = navigateDisklet(disklet, 'changes') + const deletedDisklet = navigateDisklet(disklet, 'deleted') + const dataDisklet = navigateDisklet(disklet, 'data') + + // List both directories (each with limit to avoid over-listing) + const allChangePaths = await deepListWithLimit( + changesDisklet, + maxChangesPerSync + ) + const allDeletePaths = await deepListWithLimit( + deletedDisklet, + maxChangesPerSync + ) + + // Interlace changes and deletions, respecting the limit + const changePaths: string[] = [] + const deletePaths: string[] = [] + const outgoingChanges: ChangeSet = {} + const maxChangesCount = Math.min( + maxChangesPerSync, + allChangePaths.length + allDeletePaths.length + ) + for (let i = 0; i < maxChangesCount; i++) { + const pickChange = async (): Promise => { + const path = allChangePaths[changePaths.length] + const data = await changesDisklet.getText(path) + outgoingChanges[path] = asEdgeBox(JSON.parse(data)) + changePaths.push(path) + } + const pickDeletion = (): void => { + const path = allDeletePaths[deletePaths.length] + outgoingChanges[path] = null + deletePaths.push(path) + } + + if (i % 2 === 0) { + if (changePaths.length < allChangePaths.length) { + await pickChange() + } else { + pickDeletion() + } + } else { + if (deletePaths.length < allDeletePaths.length) { + pickDeletion() + } else { + await pickChange() + } + } + } + + // Use readRepo if no changes, updateRepo otherwise + const hasChanges = Object.keys(outgoingChanges).length > 0 + const response = hasChanges + ? await this.updateRepo(syncKey, lastHash, { changes: outgoingChanges }) + : await this.readRepo(syncKey, lastHash) + + // Apply server's changes to the data disklet + for (const [path, change] of Object.entries(response.changes)) { + if (change === null) { + // Delete the file from data directory + await dataDisklet.delete(path) + } else { + // Write the file to data directory + await dataDisklet.setText(path, JSON.stringify(wasEdgeBox(change))) + } + } + + // Clear synced changes from changes/ directory + for (const path of changePaths) { + await changesDisklet.delete(path) + } + + // Clear synced deletions from deleted/ directory + for (const path of deletePaths) { + await deletedDisklet.delete(path) + } + + return { + status: { + lastHash: response.hash ?? lastHash, + lastSync: Date.now() / 1000 + }, + changes: response.changes + } } } } @@ -195,3 +311,30 @@ interface ApiRequest { } const wasPostStoreBody = uncleaner(asPostStoreBody) + +// Disklet helper functions + +/** + * Lists all files in a disklet recursively, up to a limit. + * Returns a list of full paths. + */ + +async function deepListWithLimit( + disklet: Disklet, + limit: number, + path: string = '' +): Promise { + const list = await disklet.list(path) + const paths = Object.keys(list).filter(path => list[path] === 'file') + const folders = Object.keys(list).filter(path => list[path] === 'folder') + + // Loop over folders to get subpaths + for (const folder of folders) { + if (paths.length >= limit) break + const remaining = limit - paths.length + const subpaths = await deepListWithLimit(disklet, remaining, folder) + paths.push(...subpaths.slice(0, remaining)) + } + + return paths +} diff --git a/src/index.flow.js b/src/index.flow.js index b794173..11c0540 100644 --- a/src/index.flow.js +++ b/src/index.flow.js @@ -1,5 +1,6 @@ // @flow +import { type Disklet } from 'disklet' import { type FetchFunction } from 'serverlet' type EdgeServers = { @@ -7,42 +8,51 @@ type EdgeServers = { syncServers?: string[] } -type SyncClientOptions = { - fetch?: FetchFunction, - log?: (message: string) => void, - edgeServers?: EdgeServers -} - type EdgeBox = { - iv_hex: string, encryptionType: number, - data_base64: string + data_base64: string, + iv_hex: string +} + +type ChangeSet = { + [path: string]: EdgeBox | null } type PutStoreResponse = void type GetStoreResponse = { hash?: string | void, - changes: { - [keys: string]: EdgeBox | null - } + changes: ChangeSet } type PostStoreBody = { - changes: { - [keys: string]: EdgeBox | null - } + changes: ChangeSet } type PostStoreResponse = { hash: string, - changes: { - [keys: string]: EdgeBox | null - } + changes: ChangeSet +} + +export type SyncStatus = { + lastHash: string | void, + lastSync: number +} + +export type SyncResult = { + status: SyncStatus, + changes: ChangeSet +} + +export type SyncClientOptions = { + fetch?: FetchFunction, + log?: (message: string) => void, + edgeServers?: EdgeServers, + maxChangesPerSync?: number } export type SyncClient = { - createRepo: (syncKey: string) => Promise, + createRepo: (syncKey: string, apiKey?: string) => Promise, readRepo: ( syncKey: string, lastHash: string | void @@ -51,7 +61,12 @@ export type SyncClient = { syncKey: string, lastHash: string | void, body: PostStoreBody - ) => Promise + ) => Promise, + syncRepo: ( + disklet: Disklet, + syncKey: string, + lastHash: string | void + ) => Promise } -declare export function makeSyncClient(opts: SyncClientOptions): SyncClient +declare export function makeSyncClient(opts?: SyncClientOptions): SyncClient diff --git a/src/index.ts b/src/index.ts index db9057c..d949814 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,12 @@ export * from './types/rest-types' export { asMaybeConflictError, ConflictError } from './types/error' // Client -export type { SyncClient, SyncClientOptions } from './client/sync-client' +export type { + SyncStatus, + SyncClient, + SyncClientOptions, + SyncResult +} from './client/sync-client' export { makeSyncClient } from './client/sync-client' // Util diff --git a/src/types/base-types.ts b/src/types/base-types.ts index 982dd60..cd8e235 100644 --- a/src/types/base-types.ts +++ b/src/types/base-types.ts @@ -1,4 +1,14 @@ -import { asArray, asNumber, asObject, asOptional, asString } from 'cleaners' +import { + asArray, + asCodec, + asNumber, + asObject, + asOptional, + asString, + Cleaner, + uncleaner +} from 'cleaners' +import { base16, base64 } from 'rfc4648' import { normalizePath } from '../util/paths' import { VALID_PATH_REGEX, VALID_SYNC_KEY_REGEX } from '../util/regex' @@ -46,9 +56,26 @@ export const asSyncKey = (raw: any): string => { return syncKey } +/** + * A string of hex-encoded binary data. + */ +export const asBase16: Cleaner = asCodec( + raw => base16.parse(asString(raw)), + clean => base16.stringify(clean).toLowerCase() +) + +/** + * A string of base64-encoded binary data. + */ +export const asBase64: Cleaner = asCodec( + raw => base64.parse(asString(raw)), + clean => base64.stringify(clean) +) + export type EdgeBox = ReturnType export const asEdgeBox = asObject({ - iv_hex: asString, encryptionType: asNumber, - data_base64: asString + data_base64: asBase64, + iv_hex: asBase16 }) +export const wasEdgeBox = uncleaner(asEdgeBox) diff --git a/test/component/sync-client.test.ts b/test/component/sync-client.test.ts new file mode 100644 index 0000000..5890a65 --- /dev/null +++ b/test/component/sync-client.test.ts @@ -0,0 +1,573 @@ +import { expect } from 'chai' +import { Disklet, makeMemoryDisklet, navigateDisklet } from 'disklet' + +import { makeSyncClient, SyncClient } from '../../src/client/sync-client' +import { wasEdgeBox } from '../../src/types/base-types' +import { createMockSyncServer, MockSyncServer } from '../utils/mock-sync-server' + +// Test constants +const TEST_SYNC_KEY = + '0000000000000000000000000000000000000000000000000000000000000000' +const TEST_SERVER = 'http://test-sync-server' + +// Helper to create EdgeBox in wire format (for mock server responses and disk writes) +function makeWireEdgeBox( + id: number +): { + encryptionType: number + iv_hex: string + data_base64: string +} { + return wasEdgeBox({ + encryptionType: 0, + iv_hex: new Uint8Array([id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), + data_base64: new Uint8Array([id, 1, 2, 3]) + }) as { + encryptionType: number + iv_hex: string + data_base64: string + } +} + +describe('Component: SyncClient.syncRepo', () => { + // Shared test context + let disklet: ReturnType + let changesDisklet: Disklet + let deletedDisklet: Disklet + let dataDisklet: Disklet + let mockSyncServer: MockSyncServer + let client: SyncClient + + // Helper to create client with custom options + function createClient(opts: { maxChangesPerSync?: number } = {}): SyncClient { + return makeSyncClient({ + fetch: mockSyncServer.fetch, + edgeServers: { syncServers: [TEST_SERVER] }, + ...opts + }) + } + + beforeEach(() => { + disklet = makeMemoryDisklet() + changesDisklet = navigateDisklet(disklet, 'changes') + deletedDisklet = navigateDisklet(disklet, 'deleted') + dataDisklet = navigateDisklet(disklet, 'data') + mockSyncServer = createMockSyncServer() + client = createClient() + }) + + // --- Setup & Configuration --- + + describe('setup', () => { + it('uses maxChangesPerSync option to limit changes per sync (default 100)', async () => { + // Create 150 files in changes/ and track expected values + const expectedBoxes: Record< + string, + ReturnType + > = {} + for (let i = 0; i < 150; i++) { + const box = makeWireEdgeBox(i) + expectedBoxes[`file${i}.json`] = box + await changesDisklet.setText(`file${i}.json`, JSON.stringify(box)) + } + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify only 100 files in repo state with correct content + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + const serverFiles = Object.keys(repoState?.files ?? {}) + expect(serverFiles).to.have.length(100) + + // Verify each synced file has correct content + for (const fileName of serverFiles) { + expect(repoState?.files[fileName]).to.deep.equal( + expectedBoxes[fileName] + ) + } + + // Verify 50 files remain in changes/ (the ones not synced) + const remainingFiles = Object.keys(await changesDisklet.list()) + expect(remainingFiles).to.have.length(50) + + // Verify remaining files are distinct from synced files + for (const fileName of remainingFiles) { + expect(serverFiles).to.not.include(fileName) + } + }) + + it('allows custom maxChangesPerSync option', async () => { + client = createClient({ maxChangesPerSync: 5 }) + + // Create 10 files in changes/ and track expected values + const expectedBoxes: Record< + string, + ReturnType + > = {} + for (let i = 0; i < 10; i++) { + const box = makeWireEdgeBox(i) + expectedBoxes[`file${i}.json`] = box + await changesDisklet.setText(`file${i}.json`, JSON.stringify(box)) + } + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify only 5 files in repo state with correct content + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + const serverFiles = Object.keys(repoState?.files ?? {}) + expect(serverFiles).to.have.length(5) + + // Verify each synced file has correct content + for (const fileName of serverFiles) { + expect(repoState?.files[fileName]).to.deep.equal( + expectedBoxes[fileName] + ) + } + + // Verify 5 files remain in changes/ (the ones not synced) + const remainingFiles = Object.keys(await changesDisklet.list()) + expect(remainingFiles).to.have.length(5) + + // Verify remaining files are distinct from synced files + for (const fileName of remainingFiles) { + expect(serverFiles).to.not.include(fileName) + } + }) + }) + + // --- Gathering Changes & Deletions --- + + describe('gathering outgoing changes', () => { + it('gathers changes from changes/ directory and uploads to server', async () => { + const expectedBox = makeWireEdgeBox(42) + await changesDisklet.setText('myfile.json', JSON.stringify(expectedBox)) + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify file is in repo state with correct content + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files['myfile.json']).to.deep.equal(expectedBox) + }) + + it('gathers deletions from deleted/ directory and removes from server', async () => { + await deletedDisklet.setText('file-to-delete.json', '') + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { 'file-to-delete.json': makeWireEdgeBox(1) } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify file is removed from repo state + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files).to.not.have.property('file-to-delete.json') + }) + + it('handles nested directories in changes/', async () => { + const expectedBox = makeWireEdgeBox(1) + await changesDisklet.setText( + 'foo/bar/nested.json', + JSON.stringify(expectedBox) + ) + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify nested path is in repo state with correct content + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files['foo/bar/nested.json']).to.deep.equal(expectedBox) + }) + + it('handles nested directories in deleted/', async () => { + await deletedDisklet.setText('foo/bar/nested.json', '') + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { 'foo/bar/nested.json': makeWireEdgeBox(1) } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify nested path is removed from repo state + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files).to.not.have.property('foo/bar/nested.json') + }) + }) + + // --- Interlacing Logic --- + + describe('interlacing changes and deletions', () => { + it('interlaces changes and deletions alternately', async () => { + // Create 3 changes and 3 deletions + const expectedBoxes = [ + makeWireEdgeBox(0), + makeWireEdgeBox(1), + makeWireEdgeBox(2) + ] + for (let i = 0; i < 3; i++) { + await changesDisklet.setText( + `change${i}.json`, + JSON.stringify(expectedBoxes[i]) + ) + await deletedDisklet.setText(`delete${i}.json`, '') + } + + // Pre-populate server with files to delete + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { + 'delete0.json': makeWireEdgeBox(10), + 'delete1.json': makeWireEdgeBox(11), + 'delete2.json': makeWireEdgeBox(12) + } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify repo state has 3 new files with correct content and deleted files removed + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files['change0.json']).to.deep.equal(expectedBoxes[0]) + expect(repoState?.files['change1.json']).to.deep.equal(expectedBoxes[1]) + expect(repoState?.files['change2.json']).to.deep.equal(expectedBoxes[2]) + expect(repoState?.files).to.not.have.property('delete0.json') + expect(repoState?.files).to.not.have.property('delete1.json') + expect(repoState?.files).to.not.have.property('delete2.json') + }) + + it('falls back to changes when deletions exhausted', async () => { + client = createClient({ maxChangesPerSync: 6 }) + + // 5 changes and 1 deletion + const expectedBoxes: Array> = [] + for (let i = 0; i < 5; i++) { + expectedBoxes[i] = makeWireEdgeBox(i) + await changesDisklet.setText( + `change${i}.json`, + JSON.stringify(expectedBoxes[i]) + ) + } + await deletedDisklet.setText('delete0.json', '') + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { 'delete0.json': makeWireEdgeBox(10) } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify all 5 changes uploaded with correct content and deletion processed + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(Object.keys(repoState?.files ?? {})).to.have.length(5) + for (let i = 0; i < 5; i++) { + expect(repoState?.files[`change${i}.json`]).to.deep.equal( + expectedBoxes[i] + ) + } + expect(repoState?.files).to.not.have.property('delete0.json') + }) + + it('falls back to deletions when changes exhausted', async () => { + client = createClient({ maxChangesPerSync: 6 }) + + // 1 change and 5 deletions + const expectedBox = makeWireEdgeBox(0) + await changesDisklet.setText('change0.json', JSON.stringify(expectedBox)) + for (let i = 0; i < 5; i++) { + await deletedDisklet.setText(`delete${i}.json`, '') + } + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { + 'delete0.json': makeWireEdgeBox(10), + 'delete1.json': makeWireEdgeBox(11), + 'delete2.json': makeWireEdgeBox(12), + 'delete3.json': makeWireEdgeBox(13), + 'delete4.json': makeWireEdgeBox(14) + } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify 1 change uploaded with correct content and 5 deletions processed + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files['change0.json']).to.deep.equal(expectedBox) + expect(Object.keys(repoState?.files ?? {})).to.have.length(1) + }) + + it('respects maxChangesPerSync across both changes and deletions', async () => { + client = createClient({ maxChangesPerSync: 5 }) + + // 10 changes and 10 deletions + for (let i = 0; i < 10; i++) { + await changesDisklet.setText( + `change${i}.json`, + JSON.stringify(makeWireEdgeBox(i)) + ) + await deletedDisklet.setText(`delete${i}.json`, '') + } + + // Pre-populate with files to delete + const initialFiles: Record< + string, + ReturnType + > = {} + for (let i = 0; i < 10; i++) { + initialFiles[`delete${i}.json`] = makeWireEdgeBox(10 + i) + } + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: initialFiles + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify only 5 items were synced by checking remaining staging files + const remainingChanges = Object.keys(await changesDisklet.list()).length + const remainingDeletes = Object.keys(await deletedDisklet.list()).length + // Started with 20 total (10 changes + 10 deletes), synced 5, so 15 remain + expect(remainingChanges + remainingDeletes).to.equal(15) + + // Verify server state reflects the 5 synced items + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + const serverFiles = Object.keys(repoState?.files ?? {}) + // Started with 10 delete files, some were removed and some change files added + // With interlacing: 3 changes added, 2 deletes removed = 10 - 2 + 3 = 11 files + // Or: 2 changes added, 3 deletes removed = 10 - 3 + 2 = 9 files + const changesSynced = 10 - remainingChanges + const deletesSynced = 10 - remainingDeletes + expect(changesSynced + deletesSynced).to.equal(5) + expect(serverFiles.length).to.equal(10 - deletesSynced + changesSynced) + }) + }) + + // --- Applying Server Response --- + + describe('applying server response to data/', () => { + it('writes server changes to data/ directory', async () => { + const serverBox = makeWireEdgeBox(99) + + // Server has a file that client doesn't know about + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'hash1', + files: { 'server/file.json': serverBox } + }) + + // Client syncs with no lastHash, gets all server files + await client.syncRepo(disklet, TEST_SYNC_KEY, undefined) + + // Verify file was written to data/ with correct content + const content = await dataDisklet.getText('server/file.json') + const parsed = JSON.parse(content) + expect(parsed).to.deep.equal(serverBox) + }) + + it('deletes files from data/ when server returns null', async () => { + // Pre-populate data/ with a file + await dataDisklet.setText( + 'to-delete.json', + JSON.stringify(makeWireEdgeBox(1)) + ) + + // Server had the file but now it's deleted (empty state) + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'hash-with-file', + files: { 'to-delete.json': makeWireEdgeBox(1) } + }) + + // Stage a deletion locally + await deletedDisklet.setText('to-delete.json', '') + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'hash-with-file') + + // Verify file was deleted from data/ + const listing = await dataDisklet.list() + expect(listing).to.not.have.property('to-delete.json') + }) + + it('handles mix of server changes and deletions', async () => { + const keepBox = makeWireEdgeBox(1) + const newFileBox = makeWireEdgeBox(3) + + // Pre-populate data/ with files + await dataDisklet.setText('keep.json', JSON.stringify(keepBox)) + await dataDisklet.setText( + 'delete-me.json', + JSON.stringify(makeWireEdgeBox(2)) + ) + + // Server state: has existing files + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'old-hash', + files: { + 'keep.json': keepBox, + 'delete-me.json': makeWireEdgeBox(2) + } + }) + + // Stage deletion and new file + await changesDisklet.setText('new-file.json', JSON.stringify(newFileBox)) + await deletedDisklet.setText('delete-me.json', '') + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'old-hash') + + // Verify correct files exist in data/ with correct content + const keepContent = JSON.parse(await dataDisklet.getText('keep.json')) + const newFileContent = JSON.parse( + await dataDisklet.getText('new-file.json') + ) + expect(keepContent).to.deep.equal(keepBox) + expect(newFileContent).to.deep.equal(newFileBox) + + const listing = await dataDisklet.list() + expect(listing).to.not.have.property('delete-me.json') + }) + }) + + // --- Clearing Staging Directories --- + + describe('clearing staging directories after sync', () => { + it('removes synced files from changes/ directory', async () => { + const box1 = makeWireEdgeBox(1) + const box2 = makeWireEdgeBox(2) + await changesDisklet.setText('file1.json', JSON.stringify(box1)) + await changesDisklet.setText('file2.json', JSON.stringify(box2)) + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify changes/ is empty and server has correct content + const listing = await changesDisklet.list() + expect(Object.keys(listing)).to.have.length(0) + + const repoState = mockSyncServer.getRepoState(TEST_SYNC_KEY) + expect(repoState?.files['file1.json']).to.deep.equal(box1) + expect(repoState?.files['file2.json']).to.deep.equal(box2) + }) + + it('removes synced files from deleted/ directory', async () => { + await deletedDisklet.setText('file1.json', '') + await deletedDisklet.setText('file2.json', '') + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'initial', + files: { + 'file1.json': makeWireEdgeBox(1), + 'file2.json': makeWireEdgeBox(2) + } + }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify deleted/ is empty + const listing = await deletedDisklet.list() + expect(Object.keys(listing)).to.have.length(0) + }) + + it('only clears files that were actually synced (respects limit)', async () => { + client = createClient({ maxChangesPerSync: 3 }) + + // Create 10 files + for (let i = 0; i < 10; i++) { + await changesDisklet.setText( + `file${i}.json`, + JSON.stringify(makeWireEdgeBox(i)) + ) + } + + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'initial', files: {} }) + + await client.syncRepo(disklet, TEST_SYNC_KEY, 'initial') + + // Verify only 3 files removed, 7 remain + const listing = await changesDisklet.list() + expect(Object.keys(listing)).to.have.length(7) + }) + + it('does not clear staging directories if sync fails', async () => { + await changesDisklet.setText( + 'file.json', + JSON.stringify(makeWireEdgeBox(1)) + ) + await deletedDisklet.setText('delete.json', '') + + // Don't set repo state - will return 404 + + try { + await client.syncRepo(disklet, TEST_SYNC_KEY, undefined) + expect.fail('Should have thrown') + } catch (err) { + // Expected - repo not found + } + + // Verify staging directories still contain files + const changesListing = await changesDisklet.list() + const deletedListing = await deletedDisklet.list() + expect(changesListing).to.have.property('file.json') + expect(deletedListing).to.have.property('delete.json') + }) + }) + + // --- Return Value --- + + describe('SyncResult return value', () => { + it('returns status.lastHash from server response', async () => { + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'abc123', files: {} }) + + const result = await client.syncRepo(disklet, TEST_SYNC_KEY, undefined) + + expect(result.status.lastHash).to.equal('abc123') + }) + + it('preserves lastHash when no changes (already up to date)', async () => { + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'existing-hash', + files: {} + }) + + const result = await client.syncRepo( + disklet, + TEST_SYNC_KEY, + 'existing-hash' + ) + + expect(result.status.lastHash).to.equal('existing-hash') + }) + + it('returns status.lastSync as current timestamp in seconds', async () => { + mockSyncServer.setRepoState(TEST_SYNC_KEY, { hash: 'hash1', files: {} }) + + const beforeTime = Date.now() / 1000 + const result = await client.syncRepo(disklet, TEST_SYNC_KEY, undefined) + const afterTime = Date.now() / 1000 + + expect(result.status.lastSync).to.be.at.least(beforeTime) + expect(result.status.lastSync).to.be.at.most(afterTime) + }) + + it('returns changes from server response', async () => { + const serverBox = makeWireEdgeBox(42) + mockSyncServer.setRepoState(TEST_SYNC_KEY, { + hash: 'hash1', + files: { 'server/file.json': serverBox } + }) + + const result = await client.syncRepo(disklet, TEST_SYNC_KEY, undefined) + + // Verify the change is returned with correct content (cleaned format) + expect(result.changes).to.have.property('server/file.json') + const change = result.changes['server/file.json'] + expect(change).to.not.equal(null) + expect(change?.encryptionType).to.equal(serverBox.encryptionType) + }) + }) +}) diff --git a/test/utils/mock-sync-server.ts b/test/utils/mock-sync-server.ts new file mode 100644 index 0000000..cbffa8e --- /dev/null +++ b/test/utils/mock-sync-server.ts @@ -0,0 +1,185 @@ +import { FetchFunction, FetchResponse } from 'serverlet' + +/** + * Wire format for EdgeBox (string-encoded fields, as sent over HTTP) + */ +export interface WireEdgeBox { + encryptionType: number + iv_hex: string + data_base64: string +} + +/** + * Wire format for ChangeSet (values are WireEdgeBox or null) + */ +export interface WireChangeSet { + [path: string]: WireEdgeBox | null +} + +/** + * Internal repo state + */ +export interface RepoState { + hash: string + files: WireChangeSet +} + +/** + * Return type for createMockSyncServerFetch + */ +export interface MockSyncServer { + /** The mock fetch function to pass to makeSyncClient */ + fetch: FetchFunction + /** Get the current repo state for a syncKey */ + getRepoState: (syncKey: string) => RepoState | undefined + /** Set initial repo state for testing */ + setRepoState: (syncKey: string, state: RepoState) => void +} + +/** + * Generates a simple hash from repo state + */ +function generateHash(files: WireChangeSet): string { + const keys = Object.keys(files).sort((a, b) => a.localeCompare(b)) + const content = keys.map(k => `${k}:${JSON.stringify(files[k])}`).join('|') + // Simple hash: use length + first few chars of stringified content + let hash = 0 + for (let i = 0; i < content.length; i++) { + hash = (hash << 5) - hash + content.charCodeAt(i) + hash |= 0 + } + return Math.abs(hash).toString(16).padStart(8, '0') +} + +/** + * Creates a stateful mock sync server that implements the sync-server protocol. + * Stores repo state in memory and responds to requests accordingly. + */ +export function createMockSyncServer(): MockSyncServer { + const repos: Map = new Map() + + // Track which files were known at each hash for delta responses + const hashSnapshots: Map = new Map() + + const fetch: FetchFunction = async (url, init = {}) => { + const method = init.method ?? 'GET' + const urlString = url.toString() + + // Parse request body if present + const requestBody = + init.body != null ? JSON.parse(init.body as string) : undefined + + // Parse URL to extract syncKey and hash + // Format: /api/v2/store/:syncKey/:hash? + const urlParts = urlString.split('/api/v2/store/')[1]?.split('/') ?? [] + const syncKey = urlParts[0] ?? '' + const requestHash = urlParts[1] ?? undefined + + let status = 200 + let body: unknown + + if (method === 'PUT') { + // Create repo + if (repos.has(syncKey)) { + status = 409 + body = { message: 'Repo already exists' } + } else { + const initialState: RepoState = { hash: 'initial', files: {} } + repos.set(syncKey, initialState) + hashSnapshots.set('initial', {}) + body = undefined + } + } else if (method === 'GET') { + // Read repo + const repo = repos.get(syncKey) + if (repo == null) { + status = 404 + body = { message: 'Repo not found' } + } else { + // Return changes since requestHash + let changes: WireChangeSet = {} + if (requestHash == null || requestHash === '') { + // No hash provided, return all files + changes = { ...repo.files } + } else if (requestHash === repo.hash) { + // Already up to date, no changes + changes = {} + } else { + // Return diff since requestHash + const oldSnapshot = hashSnapshots.get(requestHash) ?? {} + for (const [path, value] of Object.entries(repo.files)) { + const oldValue = oldSnapshot[path] + if (JSON.stringify(value) !== JSON.stringify(oldValue)) { + changes[path] = value + } + } + // Check for deletions (files in old snapshot but not in current) + for (const path of Object.keys(oldSnapshot)) { + if (!(path in repo.files)) { + changes[path] = null + } + } + } + body = { hash: repo.hash, changes } + } + } else if (method === 'POST') { + // Update repo + const repo = repos.get(syncKey) + if (repo == null) { + status = 404 + body = { message: 'Repo not found' } + } else { + const reqBody = requestBody as { changes: WireChangeSet } + const incomingChanges = reqBody.changes ?? {} + + // Apply incoming changes to repo + for (const [path, value] of Object.entries(incomingChanges)) { + if (value === null) { + // Remove file by creating new object without it + const { [path]: _, ...rest } = repo.files + repo.files = rest + } else { + repo.files[path] = value + } + } + + // Generate new hash and save snapshot + const newHash = generateHash(repo.files) + hashSnapshots.set(newHash, { ...repo.files }) + repo.hash = newHash + + // Return the changes that were applied (echo back) + // In real server this would include any conflicts, but we just echo + body = { hash: newHash, changes: incomingChanges } + } + } else { + status = 405 + body = { message: 'Method not allowed' } + } + + const response: FetchResponse = { + ok: status >= 200 && status < 300, + status, + headers: { + get: () => null, + has: () => false, + forEach: () => {} + }, + text: async () => (body !== undefined ? JSON.stringify(body) : ''), + json: async () => body, + arrayBuffer: async () => new ArrayBuffer(0) + } + return response + } + + return { + fetch, + getRepoState(syncKey: string): RepoState | undefined { + return repos.get(syncKey) + }, + setRepoState(syncKey: string, state: RepoState): void { + repos.set(syncKey, state) + hashSnapshots.set(state.hash, { ...state.files }) + } + } +} diff --git a/yarn.lock b/yarn.lock index a5d631b..d7cb8ab 100644 --- a/yarn.lock +++ b/yarn.lock @@ -927,6 +927,13 @@ dir-glob@^3.0.1: dependencies: path-type "^4.0.0" +disklet@^0.5.2: + version "0.5.2" + resolved "https://registry.yarnpkg.com/disklet/-/disklet-0.5.2.tgz#eb1b3bfc2840883cb432aaa16d2c78a345cdd778" + integrity sha512-Fx9LFHztHa47QVCHKaABvk+R/zInmi17HweWM+PQyO3bv55E709IPcJIMOH1WyuPNTPHAAM9GToN6NgpdLhUCQ== + dependencies: + rfc4648 "^1.3.0" + doctrine@1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/doctrine/-/doctrine-1.5.0.tgz#379dce730f6166f76cefa4e6707a159b02c5a6fa" @@ -2701,6 +2708,11 @@ reusify@^1.0.4: resolved "https://registry.yarnpkg.com/reusify/-/reusify-1.0.4.tgz#90da382b1e126efc02146e90845a88db12925d76" integrity sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw== +rfc4648@^1.3.0: + version "1.5.4" + resolved "https://registry.yarnpkg.com/rfc4648/-/rfc4648-1.5.4.tgz#1174c0afba72423a0b70c386ecfeb80aa61b05ca" + integrity sha512-rRg/6Lb+IGfJqO05HZkN50UtY7K/JhxJag1kP23+zyMfrvoB0B7RWv06MbOzoc79RgCdNTiUaNsTT1AJZ7Z+cg== + rfc4648@^1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/rfc4648/-/rfc4648-1.5.0.tgz#1ba940ec1649685ec4d88788dc57fb8e18855055"