Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased

- added: `syncRepo(disklet, syncKey, lastHash)` method on `SyncClient` for Disklet-based repository synchronization with `changes/`, `deleted/`, and `data/` staging directories

## 0.2.9-1 (2024-07-30)

- removed: Remove InfoClient usage for SyncClient instances

## 0.2.9 (2024-02-26)

- added: Detect conflicts while creating repos, and report these with a new `ConflictError` type.
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "edge-sync-client",
"version": "0.2.9",
"version": "0.2.9-1",
"description": "Library for accessing the Edge data-sync system",
"keywords": [
"Edge"
Expand Down Expand Up @@ -59,6 +59,9 @@
"rfc4648": "^1.5.0",
"serverlet": "^0.1.0"
},
"peerDependencies": {
"disklet": ">=0.5.0"
},
"devDependencies": {
"@babel/core": "^7.10.4",
"@rollup/plugin-node-resolve": "^8.1.0",
Expand All @@ -71,6 +74,7 @@
"@typescript-eslint/parser": "^4.8.2",
"babel-eslint": "^10.1.0",
"chai": "^4.2.0",
"disklet": "^0.5.2",
"eslint": "^7.14.0",
"eslint-config-standard-kit": "0.15.1",
"eslint-plugin-flowtype": "^5.2.0",
Expand Down
6 changes: 5 additions & 1 deletion rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import packageJson from './package.json'
const extensions = ['.ts']

export default {
external: [/@babel\/runtime/, ...Object.keys(packageJson.dependencies)],
external: [
/@babel\/runtime/,
...Object.keys(packageJson.dependencies),
...Object.keys(packageJson.peerDependencies || {})
],
input: 'src/index.ts',
output: [
{ file: packageJson.main, format: 'cjs' },
Expand Down
165 changes: 160 additions & 5 deletions src/client/sync-client.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
import { asMaybe, Cleaner, uncleaner } from 'cleaners'
import crossFetch from 'cross-fetch'
import { Disklet, navigateDisklet } from 'disklet'
import { FetchFunction, FetchResponse } from 'serverlet'

import { EdgeServers } from '../types/base-types'
import { asEdgeBox, EdgeServers, wasEdgeBox } from '../types/base-types'
import { ConflictError } from '../types/error'
import {
asGetStoreResponse,
asPostStoreBody,
asPostStoreResponse,
asPutStoreResponse,
asServerErrorResponse,
ChangeSet,
GetStoreResponse,
PostStoreBody,
PostStoreResponse,
PutStoreResponse
} from '../types/rest-types'
import { syncKeyToRepoId } from '../util/security'
import { shuffle } from '../util/shuffle'
import { makeInfoClient } from './info-client'

const defaultEdgeServers: Required<EdgeServers> = {
infoServers: ['https://info-eu1.edge.app', 'https://info-us1.edge.app'],
syncServers: [
'https://sync-us1.edge.app',
'https://sync-us2.edge.app',
'https://sync-us3.edge.app',
'https://sync-us4.edge.app',
'https://sync-us5.edge.app',
'https://sync-us6.edge.app',
'https://sync-eu.edge.app'
]
}

export interface SyncClient {
createRepo: (syncKey: string, apiKey?: string) => Promise<PutStoreResponse>
Expand All @@ -30,21 +44,47 @@ export interface SyncClient {
lastHash: string | undefined,
body: PostStoreBody
) => Promise<PostStoreResponse>
/**
* Sync the repository using the provided Disklet.
* Gathers changes from `changes/` and deletions from `deleted/` directories,
* sends them to the server, applies server's response to `data/`, and clears staging directories.
*/
syncRepo: (
disklet: Disklet,
syncKey: string,
lastHash: string | undefined
) => Promise<SyncResult>
}

export interface SyncResult {
/** The sync status after sync */
status: SyncStatus
/** The changes received from the server */
changes: ChangeSet
}

export interface SyncStatus {
/** The last known hash from the server */
lastHash: string | undefined
/** Unix timestamp of the last sync (seconds) */
lastSync: number
}

export interface SyncClientOptions {
fetch?: FetchFunction
log?: (message: string) => void
edgeServers?: EdgeServers
/** Maximum number of changes to send per sync (default: 100) */
maxChangesPerSync?: number
}

export function makeSyncClient(opts: SyncClientOptions = {}): SyncClient {
const { fetch = crossFetch, log = () => {} } = opts
const infoClient = makeInfoClient(opts)
const { fetch = crossFetch, log = () => {}, maxChangesPerSync = 100 } = opts
const syncServers: Required<EdgeServers>['syncServers'] =
opts.edgeServers?.syncServers ?? defaultEdgeServers.syncServers

// Returns the sync servers from the info client shuffled
async function shuffledSyncServers(): Promise<string[]> {
const { syncServers } = await infoClient.getEdgeServers()
return shuffle(syncServers)
}

Expand Down Expand Up @@ -170,6 +210,94 @@ export function makeSyncClient(opts: SyncClientOptions = {}): SyncClient {
}

throw error
},

async syncRepo(disklet, syncKey, lastHash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems reasonable. However, this should not be public API. Instead, we want to export a makeSyncedDisklet(parentPath: Disklet) => Disklet & { sync() => Promise<string[]> }.

You do const myDisklet = makeSyncedDisklet(...), and then you can do myDisklet.getText, myDisklet.delete, myDisklet.setText, and so forth. This library automatically routes the changes to the right places (/data, /changes, /deleted). Finally, you can do myDisklet.sync() to sync with the server.

This keeps the details fully black-boxed. The contents of the "status.json", the layout of the sub-folders (/data, /changes, /deleted), and the details of the sync algorithm are fully abstracted. Remember when we built the change server? We built the "legacy" API compatible with the existing core, and we also defined the "new" API with faster sync. We never implemented the client side of the "new" API, but if we wanted to do that, we would want edge-sync-client to be "black-boxed" so the change can be seamless. If the core has to "know" the format of the "status.json" or the general on-disk layout of the repo, we have failed.

// Get subdisklets for changes, deletions, and data
const changesDisklet = navigateDisklet(disklet, 'changes')
const deletedDisklet = navigateDisklet(disklet, 'deleted')
const dataDisklet = navigateDisklet(disklet, 'data')

// List both directories (each with limit to avoid over-listing)
const allChangePaths = await deepListWithLimit(
changesDisklet,
maxChangesPerSync
)
const allDeletePaths = await deepListWithLimit(
deletedDisklet,
maxChangesPerSync
)

// Interlace changes and deletions, respecting the limit
const changePaths: string[] = []
const deletePaths: string[] = []
const outgoingChanges: ChangeSet = {}
const maxChangesCount = Math.min(
maxChangesPerSync,
allChangePaths.length + allDeletePaths.length
)
for (let i = 0; i < maxChangesCount; i++) {
const pickChange = async (): Promise<void> => {
const path = allChangePaths[changePaths.length]
const data = await changesDisklet.getText(path)
outgoingChanges[path] = asEdgeBox(JSON.parse(data))
changePaths.push(path)
}
const pickDeletion = (): void => {
const path = allDeletePaths[deletePaths.length]
outgoingChanges[path] = null
deletePaths.push(path)
}

if (i % 2 === 0) {
if (changePaths.length < allChangePaths.length) {
await pickChange()
} else {
pickDeletion()
}
} else {
if (deletePaths.length < allDeletePaths.length) {
pickDeletion()
} else {
await pickChange()
}
}
}

// Use readRepo if no changes, updateRepo otherwise
const hasChanges = Object.keys(outgoingChanges).length > 0
const response = hasChanges
? await this.updateRepo(syncKey, lastHash, { changes: outgoingChanges })
: await this.readRepo(syncKey, lastHash)

// Apply server's changes to the data disklet
for (const [path, change] of Object.entries(response.changes)) {
if (change === null) {
// Delete the file from data directory
await dataDisklet.delete(path)
} else {
// Write the file to data directory
await dataDisklet.setText(path, JSON.stringify(wasEdgeBox(change)))
}
}

// Clear synced changes from changes/ directory
for (const path of changePaths) {
await changesDisklet.delete(path)
}

// Clear synced deletions from deleted/ directory
for (const path of deletePaths) {
await deletedDisklet.delete(path)
}

return {
status: {
lastHash: response.hash ?? lastHash,
lastSync: Date.now() / 1000
},
changes: response.changes
}
}
}
}
Expand All @@ -183,3 +311,30 @@ interface ApiRequest {
}

const wasPostStoreBody = uncleaner(asPostStoreBody)

// Disklet helper functions

/**
* Lists all files in a disklet recursively, up to a limit.
* Returns a list of full paths.
*/

async function deepListWithLimit(
disklet: Disklet,
limit: number,
path: string = ''
): Promise<string[]> {
const list = await disklet.list(path)
const paths = Object.keys(list).filter(path => list[path] === 'file')
const folders = Object.keys(list).filter(path => list[path] === 'folder')

// Loop over folders to get subpaths
for (const folder of folders) {
if (paths.length >= limit) break
const remaining = limit - paths.length
const subpaths = await deepListWithLimit(disklet, remaining, folder)
paths.push(...subpaths.slice(0, remaining))
}

return paths
}
55 changes: 35 additions & 20 deletions src/index.flow.js
Original file line number Diff line number Diff line change
@@ -1,48 +1,58 @@
// @flow

import { type Disklet } from 'disklet'
import { type FetchFunction } from 'serverlet'

type EdgeServers = {
infoServers?: string[],
syncServers?: string[]
}

type SyncClientOptions = {
fetch?: FetchFunction,
log?: (message: string) => void,
edgeServers?: EdgeServers
}

type EdgeBox = {
iv_hex: string,
encryptionType: number,
data_base64: string
data_base64: string,
iv_hex: string
}

type ChangeSet = {
[path: string]: EdgeBox | null
}

type PutStoreResponse = void

type GetStoreResponse = {
hash?: string | void,
changes: {
[keys: string]: EdgeBox | null
}
changes: ChangeSet
}

type PostStoreBody = {
changes: {
[keys: string]: EdgeBox | null
}
changes: ChangeSet
}

type PostStoreResponse = {
hash: string,
changes: {
[keys: string]: EdgeBox | null
}
changes: ChangeSet
}

export type SyncStatus = {
lastHash: string | void,
lastSync: number
}

export type SyncResult = {
status: SyncStatus,
changes: ChangeSet
}

export type SyncClientOptions = {
fetch?: FetchFunction,
log?: (message: string) => void,
edgeServers?: EdgeServers,
maxChangesPerSync?: number
}

export type SyncClient = {
createRepo: (syncKey: string) => Promise<PutStoreResponse>,
createRepo: (syncKey: string, apiKey?: string) => Promise<PutStoreResponse>,
readRepo: (
syncKey: string,
lastHash: string | void
Expand All @@ -51,7 +61,12 @@ export type SyncClient = {
syncKey: string,
lastHash: string | void,
body: PostStoreBody
) => Promise<PostStoreResponse>
) => Promise<PostStoreResponse>,
syncRepo: (
disklet: Disklet,
syncKey: string,
lastHash: string | void
) => Promise<SyncResult>
}

declare export function makeSyncClient(opts: SyncClientOptions): SyncClient
declare export function makeSyncClient(opts?: SyncClientOptions): SyncClient
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export * from './types/rest-types'
export { asMaybeConflictError, ConflictError } from './types/error'

// Client
export type { SyncClient, SyncClientOptions } from './client/sync-client'
export type {
SyncStatus,
SyncClient,
SyncClientOptions,
SyncResult
} from './client/sync-client'
export { makeSyncClient } from './client/sync-client'

// Util
Expand Down
Loading
Loading