From b9d9f97917fdf5ad0103287dceff22485c3ea070 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 11:40:03 -0400 Subject: [PATCH 01/13] Update test.js --- test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test.js b/test.js index 7ca8e32..ca23d37 100644 --- a/test.js +++ b/test.js @@ -688,7 +688,7 @@ test('drive.download(folder, [options])', async (t) => { await drive.put('/parent/child/grandchild1', nil) await drive.put('/parent/child/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) const blobs = await mirror.drive.getBlobs() @@ -2057,6 +2057,6 @@ function replicateDebugStream(t, a, b, opts = {}) { return [s1, s2] } -async function ensureDbLength(drive, length) { - await drive.checkout(length).db.core.get(length - 1) +async function ensureDbLength(drive, length, timeout = 20000) { + await drive.checkout(length).db.core.get(length - 1, { timeout }) } From 8c8e5f46dc17db42e00a8ce0659dd00cc35bc940 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 12:00:51 -0400 Subject: [PATCH 02/13] Update test.js --- test.js | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test.js b/test.js index ca23d37..9120263 100644 --- a/test.js +++ b/test.js @@ -95,7 +95,7 @@ test('drive.get(path, { wait: false }) throws if entry exists but not found', as s1.pipe(s2).pipe(s1) await drive.put('/file', 'content') - await eventFlush() + await ensureDbLength(otherDrive, drive.version) await otherDrive.entry('/file') // Ensure in bee @@ -731,7 +731,7 @@ test('drive.download(filename, [options])', async (t) => { await drive.put('/file', nil) await drive.put('/parent/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) await mirror.drive.getBlobs() const download = mirror.drive.download('/file') @@ -836,7 +836,7 @@ test('drive.has(path)', async (t) => { await drive.put('/parent/child/grandchild1', nil) await drive.put('/parent/child/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) t.absent(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/child/grandchild2')) @@ -845,10 +845,12 @@ test('drive.has(path)', async (t) => { await drive.put('/parent/sibling/grandchild1', nil) + await ensureDbLength(mirror.drive, drive.version, 20_000) + const downloadChild = mirror.drive.download('/parent/child/') await downloadChild.done() - await eventFlush() + await ensureDbLength(mirror.drive, drive.version, 20_000) t.ok(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/')) @@ -856,7 +858,7 @@ test('drive.has(path)', async (t) => { const downloadSibling = mirror.drive.download('/parent/sibling/') await downloadSibling.done() - await eventFlush() + await ensureDbLength(mirror.drive, drive.version, 20_000) t.ok(await mirror.drive.has('/parent/')) t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) }) @@ -1837,7 +1839,7 @@ test('monitor range download', async (t) => { await drive.put('/file-b', Buffer.alloc(1024)) await drive.put('/file-c', Buffer.alloc(1024)) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) const monitor = mirror.drive.monitor('download-monitor') await monitor.ready() From 60d08fc43631133a4c239db48c997645fb910ef9 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 12:01:56 -0400 Subject: [PATCH 03/13] Update test.js --- test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test.js b/test.js index 9120263..d0eeb40 100644 --- a/test.js +++ b/test.js @@ -845,12 +845,12 @@ test('drive.has(path)', async (t) => { await drive.put('/parent/sibling/grandchild1', nil) - await ensureDbLength(mirror.drive, drive.version, 20_000) + await ensureDbLength(mirror.drive, drive.version) const downloadChild = mirror.drive.download('/parent/child/') await downloadChild.done() - await ensureDbLength(mirror.drive, drive.version, 20_000) + await ensureDbLength(mirror.drive, drive.version) t.ok(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/')) @@ -858,7 +858,7 @@ test('drive.has(path)', async (t) => { const downloadSibling = mirror.drive.download('/parent/sibling/') await downloadSibling.done() - await ensureDbLength(mirror.drive, drive.version, 20_000) + await ensureDbLength(mirror.drive, drive.version) t.ok(await mirror.drive.has('/parent/')) t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) }) From 7ac7610a8500d03064ce670883205710bb7c2dee Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 12:47:33 -0400 Subject: [PATCH 04/13] Update test.js --- test.js | 95 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 48 deletions(-) diff --git a/test.js b/test.js index d0eeb40..7fdddad 100644 --- a/test.js +++ b/test.js @@ -381,24 +381,25 @@ test('watch() basic', async function (t) { const buf = b4a.from('hi') const watcher = drive.watch() + await watcher.ready() - eventFlush().then(async () => { - await drive.put('/a.txt', buf) - }) + const next = watcher.next() + await drive.put('/a.txt', buf) - for await (const [current, previous] of watcher) { - // eslint-disable-line no-unreachable-loop - t.ok(current instanceof Hyperdrive) - t.ok(previous instanceof Hyperdrive) - t.is(current.version, 2) - t.is(previous.version, 1) - t.alike(await current.get('/a.txt'), buf) - break - } + const { value } = await next + const [current, previous] = value + + t.ok(current instanceof Hyperdrive) + t.ok(previous instanceof Hyperdrive) + t.is(current.version, 2) + t.is(previous.version, 1) + t.alike(await current.get('/a.txt'), buf) + + await watcher.destroy() }) test('watch(folder) basic', async function (t) { - t.plan(1) + t.plan(2) const { drive } = await testenv(t) const buf = b4a.from('hi') @@ -408,49 +409,41 @@ test('watch(folder) basic', async function (t) { await drive.put('/examples/more/a.txt', buf) const watcher = drive.watch('/examples') + await watcher.ready() - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') + const next = watcher.next() await drive.put('/b.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') await drive.put('/examples/b.txt', buf) - await eventFlush() - onchange = null + + const { value } = await next + const [current, previous] = value + + t.is(await previous.get('/examples/b.txt'), null) + t.alike(await current.get('/examples/b.txt'), buf) + + await watcher.destroy() }) test('watch(folder) should normalize folder', async function (t) { - t.plan(1) + t.plan(2) const { drive } = await testenv(t) const buf = b4a.from('hi') const watcher = drive.watch('examples//more//') + await watcher.ready() - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') + const next = watcher.next() await drive.put('/examples/a.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') await drive.put('/examples/more/a.txt', buf) - await eventFlush() - onchange = null + + const { value } = await next + const [current, previous] = value + + t.is(await previous.get('/examples/more/a.txt'), null) + t.alike(await current.get('/examples/more/a.txt'), buf) + + await watcher.destroy() }) test('drive.diff(length)', async (t) => { @@ -1750,15 +1743,17 @@ test('download can be destroyed', async (t) => { await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) + const blobs = await mirror.drive.getBlobs() const download = mirror.drive.download('/file') + await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') download.destroy() // not needed, just for test timing await download.close() - t.ok(mirror.drive.blobs.core.contiguousLength < mirror.drive.blobs.core.length) + t.ok(blobs.core.contiguousLength < blobs.core.length) }) // VERY TIMING DEPENDENT, NEEDS FIX @@ -2010,10 +2005,6 @@ async function streamToBuffer(stream) { return b4a.concat(chunks) } -function eventFlush() { - return new Promise((resolve) => setTimeout(resolve, 1000)) -} - async function replicate(drive, swarm, mirror) { swarm.on('connection', (conn) => drive.corestore.replicate(conn)) const discovery = swarm.join(drive.discoveryKey, { @@ -2062,3 +2053,11 @@ function replicateDebugStream(t, a, b, opts = {}) { async function ensureDbLength(drive, length, timeout = 20000) { await drive.checkout(length).db.core.get(length - 1, { timeout }) } + +async function waitForAppendIfEmpty(core, timeout = 20000, message) { + if (core.length !== 0) return + await Promise.race([ + once(core, 'append'), + new Promise((_, reject) => setTimeout(reject, timeout, new Error(message))) + ]) +} From 978f10d12ddc7d1462f58bb2e73e20cffdfed700 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 13:01:05 -0400 Subject: [PATCH 05/13] Update test.js --- test.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test.js b/test.js index 7fdddad..9178dcb 100644 --- a/test.js +++ b/test.js @@ -382,6 +382,7 @@ test('watch() basic', async function (t) { const watcher = drive.watch() await watcher.ready() + const baseVersion = drive.version const next = watcher.next() await drive.put('/a.txt', buf) @@ -391,8 +392,8 @@ test('watch() basic', async function (t) { t.ok(current instanceof Hyperdrive) t.ok(previous instanceof Hyperdrive) - t.is(current.version, 2) - t.is(previous.version, 1) + t.is(current.version, baseVersion + 1) + t.is(previous.version, baseVersion) t.alike(await current.get('/a.txt'), buf) await watcher.destroy() @@ -411,8 +412,8 @@ test('watch(folder) basic', async function (t) { const watcher = drive.watch('/examples') await watcher.ready() - const next = watcher.next() await drive.put('/b.txt', buf) + const next = watcher.next() await drive.put('/examples/b.txt', buf) const { value } = await next @@ -433,8 +434,8 @@ test('watch(folder) should normalize folder', async function (t) { const watcher = drive.watch('examples//more//') await watcher.ready() - const next = watcher.next() await drive.put('/examples/a.txt', buf) + const next = watcher.next() await drive.put('/examples/more/a.txt', buf) const { value } = await next From 1e04765bb10bfce7d0c43f1ae75cd1037d945955 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 2 Apr 2026 13:28:17 -0400 Subject: [PATCH 06/13] Update test.js --- test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.js b/test.js index 9178dcb..93f5cd2 100644 --- a/test.js +++ b/test.js @@ -2055,7 +2055,7 @@ async function ensureDbLength(drive, length, timeout = 20000) { await drive.checkout(length).db.core.get(length - 1, { timeout }) } -async function waitForAppendIfEmpty(core, timeout = 20000, message) { +async function waitForAppendIfEmpty(core, message, timeout = 20000) { if (core.length !== 0) return await Promise.race([ once(core, 'append'), From 4b51b416b8862dfb3bc1e9f22dfe49e2a0885626 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Tue, 7 Apr 2026 10:59:41 -0400 Subject: [PATCH 07/13] fixes --- test.js | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/test.js b/test.js index 93f5cd2..89596fb 100644 --- a/test.js +++ b/test.js @@ -400,7 +400,7 @@ test('watch() basic', async function (t) { }) test('watch(folder) basic', async function (t) { - t.plan(2) + t.plan(3) const { drive } = await testenv(t) const buf = b4a.from('hi') @@ -413,12 +413,14 @@ test('watch(folder) basic', async function (t) { await watcher.ready() await drive.put('/b.txt', buf) + const prevVersion = drive.version const next = watcher.next() await drive.put('/examples/b.txt', buf) const { value } = await next const [current, previous] = value + t.is(previous.version, prevVersion) t.is(await previous.get('/examples/b.txt'), null) t.alike(await current.get('/examples/b.txt'), buf) @@ -839,20 +841,15 @@ test('drive.has(path)', async (t) => { await drive.put('/parent/sibling/grandchild1', nil) - await ensureDbLength(mirror.drive, drive.version) - const downloadChild = mirror.drive.download('/parent/child/') await downloadChild.done() - await ensureDbLength(mirror.drive, drive.version) - t.ok(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/')) const downloadSibling = mirror.drive.download('/parent/sibling/') await downloadSibling.done() - await ensureDbLength(mirror.drive, drive.version) t.ok(await mirror.drive.has('/parent/')) t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) }) @@ -2052,13 +2049,28 @@ function replicateDebugStream(t, a, b, opts = {}) { } async function ensureDbLength(drive, length, timeout = 20000) { - await drive.checkout(length).db.core.get(length - 1, { timeout }) + while (drive.db.core.length < length) await once(drive.db.core, 'append') } async function waitForAppendIfEmpty(core, message, timeout = 20000) { if (core.length !== 0) return - await Promise.race([ - once(core, 'append'), - new Promise((_, reject) => setTimeout(reject, timeout, new Error(message))) - ]) + await new Promise((resolve, reject) => { + let timer = null + + function cleanup() { + if (timer) clearTimeout(timer) + core.removeListener('append', onappend) + } + + function onappend() { + cleanup() + resolve() + } + + core.once('append', onappend) + timer = setTimeout(() => { + cleanup() + reject(new Error(message)) + }, timeout) + }) } From 5ea01c267bc5d3efd08691760e7270a941633ca5 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Tue, 7 Apr 2026 16:32:53 -0400 Subject: [PATCH 08/13] Update test.js --- test.js | 204 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 52 deletions(-) diff --git a/test.js b/test.js index 89596fb..640c857 100644 --- a/test.js +++ b/test.js @@ -412,15 +412,15 @@ test('watch(folder) basic', async function (t) { const watcher = drive.watch('/examples') await watcher.ready() - await drive.put('/b.txt', buf) const prevVersion = drive.version + await drive.put('/b.txt', buf) const next = watcher.next() await drive.put('/examples/b.txt', buf) const { value } = await next const [current, previous] = value - t.is(previous.version, prevVersion) + t.ok(previous.version !== prevVersion) t.is(await previous.get('/examples/b.txt'), null) t.alike(await current.get('/examples/b.txt'), buf) @@ -840,6 +840,7 @@ test('drive.has(path)', async (t) => { t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') await drive.put('/parent/sibling/grandchild1', nil) + await ensureDbLength(mirror.drive, drive.version) const downloadChild = mirror.drive.download('/parent/child/') await downloadChild.done() @@ -955,9 +956,10 @@ test('drive.close() with openBlobsFromHeader waiting in the background', async ( t.ok(drive.corestore.closed) }) -test.skip('drive.findingPeers()', async (t) => { +test('drive.findingPeers()', async (t) => { + t.plan(2) const { drive, corestore, swarm, mirror } = await testenv(t) - await drive.put('/', b4a.from('/')) + await drive.put('/a', b4a.from('a')) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -965,9 +967,17 @@ test.skip('drive.findingPeers()', async (t) => { mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + const done = mirror.drive.findingPeers() - swarm.flush().then(done, done) - t.ok(await mirror.drive.get('/')) + const updating = mirror.drive.update({ wait: true }) + try { + await Promise.all([waitForEvent(mirror.swarm, 'connection'), mirror.swarm.flush()]) + } finally { + done() + } + + t.ok(await updating) + t.alike(await mirror.drive.get('/a'), b4a.from('a')) }) test('drive.mirror()', async (t) => { @@ -1020,18 +1030,27 @@ test('drive.clear(path)', async (t) => { t.is(nowContent, null) }) -test.skip('drive.clear(path) with diff', async (t) => { +test('drive.clear(path) with diff', async (t) => { const storage = await getTmpDir(t) const a = new Hyperdrive(new Corestore(storage)) + t.teardown(() => a.close()) await a.put('/file', b4a.alloc(4 * 1024)) + const init = await a.get('/file', { wait: false }) + t.ok(init) + const key = a.key await a.close() - const b = new Hyperdrive(new Corestore(storage)) + const b = new Hyperdrive(new Corestore(storage), key) + t.teardown(() => b.close()) + await b.getBlobs() const cleared = await b.clear('/file', { diff: true }) t.ok(cleared.blocks > 0) + t.ok(await b.entry('/file')) + await t.exception(() => b.get('/file', { wait: false }), /BLOCK_NOT_AVAILABLE/) + const cleared2 = await b.clear('/file', { diff: true }) t.is(cleared2.blocks, 0) @@ -1063,20 +1082,30 @@ test('drive.clear(path) on a checkout', async (t) => { t.is(nowContent, null) }) -test.skip('drive.clearAll() with diff', async (t) => { +test('drive.clearAll() with diff', async (t) => { + t.plan(9) const storage = await getTmpDir(t) const a = new Hyperdrive(new Corestore(storage)) + t.teardown(() => a.close()) await a.put('/file-1', b4a.alloc(4 * 1024)) await a.put('/file-2', b4a.alloc(8 * 1024)) await a.put('/file-3', b4a.alloc(16 * 1024)) + const key = a.key await a.close() - const b = new Hyperdrive(new Corestore(storage)) + const b = new Hyperdrive(new Corestore(storage), key) + t.teardown(() => b.close()) + await b.getBlobs() const cleared = await b.clearAll({ diff: true }) t.ok(cleared.blocks > 0) + for (const path of ['/file-1', '/file-2', '/file-3']) { + t.ok(await b.entry(path)) + await t.exception(() => b.get(path, { wait: false }), /BLOCK_NOT_AVAILABLE/) + } + const cleared2 = await b.clearAll({ diff: true }) t.is(cleared2.blocks, 0) @@ -1513,11 +1542,12 @@ test('getBlobsLength large db - prefetch', async (t) => { const b = new Hyperdrive(store2.session(), a.key) t.teardown(() => b.close()) + replicateDebugStream(t, a, b, { latency: 10 }) + const start = Date.now() + const targetVersion = a.version - const gotAppend = once(b.core, 'append') - replicateDebugStream(t, a, b, { latency: 10 }) - await gotAppend + await b.checkout(targetVersion).db.core.get(targetVersion - 1, { timeout: 20000 }) const bBlobsLength = await b.getBlobsLength() const end = Date.now() @@ -1525,7 +1555,6 @@ test('getBlobsLength large db - prefetch', async (t) => { t.is(bBlobsLength, await a.getBlobsLength(), 'blob lengths match') t.comment('getBlobsLength() time in secs ' + (end - start) / 1000) - t.ok(end - start < 2_000, 'synced in a reasonable time') }) test('truncate happy path', async (t) => { @@ -1754,9 +1783,8 @@ test('download can be destroyed', async (t) => { t.ok(blobs.core.contiguousLength < blobs.core.length) }) -// VERY TIMING DEPENDENT, NEEDS FIX -test.skip('upload/download can be monitored', async (t) => { - t.plan(27) +test('upload/download can be monitored', async (t) => { + t.plan(14) const { corestore, drive, swarm, mirror } = await testenv(t) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1767,46 +1795,56 @@ test.skip('upload/download can be monitored', async (t) => { await mirror.swarm.flush() const file = '/example.md' - const bytes = 1024 * 100 // big enough to trigger more than one update event + const bytes = 1024 * 100 const buffer = Buffer.alloc(bytes, '0') await drive.put(file, buffer) + await ensureDbLength(mirror.drive, drive.version) - { - // Start monitoring upload - const monitor = drive.monitor(file) - await monitor.ready() - t.is(monitor.name, file) - const expectedBlocks = [2, 1] - const expectedBytes = [bytes, 65536] - monitor.on('update', () => { - t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) - t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.uploadStats.targetBlocks, 2) - t.is(monitor.uploadStats.targetBytes, bytes) - t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) - if (!expectedBlocks.length) t.is(monitor.uploadStats.percentage, 100) - t.absent(monitor.downloadStats.blocks) - }) - } + const uploadMonitor = drive.monitor(file) + await uploadMonitor.ready() + t.is(uploadMonitor.name, file) + t.is(uploadMonitor.uploadStats.targetBytes, bytes) + t.ok(uploadMonitor.uploadStats.targetBlocks > 0) + + const downloadMonitor = mirror.drive.monitor(file) + await downloadMonitor.ready() + t.is(downloadMonitor.downloadStats.targetBytes, bytes) + t.ok(downloadMonitor.downloadStats.targetBlocks > 0) + + const sawUpload = waitForMonitorUpdate( + uploadMonitor, + () => + uploadMonitor.uploadStats.monitoringBytes === bytes && + uploadMonitor.uploadStats.blocks === uploadMonitor.uploadStats.targetBlocks && + uploadMonitor.uploadStats.percentage === 100, + () => { + t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) + } + ) + const sawDownload = waitForMonitorUpdate( + downloadMonitor, + () => + downloadMonitor.downloadStats.monitoringBytes === bytes && + downloadMonitor.downloadStats.blocks === downloadMonitor.downloadStats.targetBlocks && + downloadMonitor.downloadStats.percentage === 100, + () => { + t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) + } + ) - { - // Start monitoring download - const monitor = mirror.drive.monitor(file) - await monitor.ready() - const expectedBlocks = [2, 1] - const expectedBytes = [bytes, 65536] - monitor.on('update', () => { - t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) - t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.downloadStats.targetBlocks, 2) - t.is(monitor.downloadStats.targetBytes, bytes) - t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) - if (!expectedBlocks.length) t.is(monitor.downloadStats.percentage, 100) - t.absent(monitor.uploadStats.blocks) - }) - } + const getting = mirror.drive.get(file) + await Promise.all([getting, sawUpload, sawDownload]) - await mirror.drive.get(file) + t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) + t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) + t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) + t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) + t.is(uploadMonitor.uploadStats.percentage, 100) + t.is(downloadMonitor.downloadStats.percentage, 100) + + await uploadMonitor.close() + await downloadMonitor.close() + t.pass('monitors closed') }) test('monitor is removed from the Set on close', async (t) => { @@ -2074,3 +2112,65 @@ async function waitForAppendIfEmpty(core, message, timeout = 20000) { }, timeout) }) } + +async function waitForMonitorUpdate( + monitor, + predicate, + onSatisfied, + timeout = 20000, + message = 'Timed out waiting for monitor update' +) { + if (predicate()) { + if (onSatisfied) onSatisfied() + return + } + + await new Promise((resolve, reject) => { + let timer = null + + function cleanup() { + if (timer) clearTimeout(timer) + monitor.removeListener('update', onupdate) + } + + function onupdate() { + if (!predicate()) return + if (onSatisfied) onSatisfied() + cleanup() + resolve() + } + + monitor.on('update', onupdate) + timer = setTimeout(() => { + cleanup() + reject(new Error(message)) + }, timeout) + }) +} + +async function waitForEvent( + emitter, + event, + timeout = 20000, + message = `Timed out waiting for ${event}` +) { + await new Promise((resolve, reject) => { + let timer = null + + function cleanup() { + if (timer) clearTimeout(timer) + emitter.removeListener(event, onevent) + } + + function onevent() { + cleanup() + resolve() + } + + emitter.on(event, onevent) + timer = setTimeout(() => { + cleanup() + reject(new Error(message)) + }, timeout) + }) +} From d06f23ab947b6818c6de8d864639c7dfc07f51f8 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Tue, 7 Apr 2026 17:01:51 -0400 Subject: [PATCH 09/13] Update test.js --- test.js | 99 ++++++++++++++------------------------------------------- 1 file changed, 24 insertions(+), 75 deletions(-) diff --git a/test.js b/test.js index 640c857..d425769 100644 --- a/test.js +++ b/test.js @@ -1811,28 +1811,21 @@ test('upload/download can be monitored', async (t) => { t.is(downloadMonitor.downloadStats.targetBytes, bytes) t.ok(downloadMonitor.downloadStats.targetBlocks > 0) - const sawUpload = waitForMonitorUpdate( - uploadMonitor, - () => - uploadMonitor.uploadStats.monitoringBytes === bytes && - uploadMonitor.uploadStats.blocks === uploadMonitor.uploadStats.targetBlocks && - uploadMonitor.uploadStats.percentage === 100, - () => { - t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) - } - ) - const sawDownload = waitForMonitorUpdate( - downloadMonitor, - () => - downloadMonitor.downloadStats.monitoringBytes === bytes && - downloadMonitor.downloadStats.blocks === downloadMonitor.downloadStats.targetBlocks && - downloadMonitor.downloadStats.percentage === 100, - () => { - t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) - } - ) + const uploadDone = () => + uploadMonitor.uploadStats.monitoringBytes === bytes && + uploadMonitor.uploadStats.blocks === uploadMonitor.uploadStats.targetBlocks && + uploadMonitor.uploadStats.percentage === 100 + + const downloadDone = () => + downloadMonitor.downloadStats.monitoringBytes === bytes && + downloadMonitor.downloadStats.blocks === downloadMonitor.downloadStats.targetBlocks && + downloadMonitor.downloadStats.percentage === 100 const getting = mirror.drive.get(file) + + const sawUpload = await waitForEvent(uploadMonitor, 'update', uploadDone) + const sawDownload = await waitForEvent(downloadMonitor, 'update', downloadDone) + await Promise.all([getting, sawUpload, sawDownload]) t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) @@ -1841,6 +1834,8 @@ test('upload/download can be monitored', async (t) => { t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) t.is(uploadMonitor.uploadStats.percentage, 100) t.is(downloadMonitor.downloadStats.percentage, 100) + t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) + t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) await uploadMonitor.close() await downloadMonitor.close() @@ -2092,65 +2087,13 @@ async function ensureDbLength(drive, length, timeout = 20000) { async function waitForAppendIfEmpty(core, message, timeout = 20000) { if (core.length !== 0) return - await new Promise((resolve, reject) => { - let timer = null - - function cleanup() { - if (timer) clearTimeout(timer) - core.removeListener('append', onappend) - } - - function onappend() { - cleanup() - resolve() - } - - core.once('append', onappend) - timer = setTimeout(() => { - cleanup() - reject(new Error(message)) - }, timeout) - }) -} - -async function waitForMonitorUpdate( - monitor, - predicate, - onSatisfied, - timeout = 20000, - message = 'Timed out waiting for monitor update' -) { - if (predicate()) { - if (onSatisfied) onSatisfied() - return - } - - await new Promise((resolve, reject) => { - let timer = null - - function cleanup() { - if (timer) clearTimeout(timer) - monitor.removeListener('update', onupdate) - } - - function onupdate() { - if (!predicate()) return - if (onSatisfied) onSatisfied() - cleanup() - resolve() - } - - monitor.on('update', onupdate) - timer = setTimeout(() => { - cleanup() - reject(new Error(message)) - }, timeout) - }) + await waitForEvent(core, 'append', () => core.length !== 0, timeout, message) } async function waitForEvent( emitter, event, + predicate = null, timeout = 20000, message = `Timed out waiting for ${event}` ) { @@ -2162,7 +2105,8 @@ async function waitForEvent( emitter.removeListener(event, onevent) } - function onevent() { + function onevent(...args) { + if (predicate && !predicate(...args)) return cleanup() resolve() } @@ -2172,5 +2116,10 @@ async function waitForEvent( cleanup() reject(new Error(message)) }, timeout) + + if (predicate && predicate()) { + cleanup() + resolve() + } }) } From 23a7be9212bc614af5319ef2f1ded2b58788b688 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Tue, 7 Apr 2026 17:17:08 -0400 Subject: [PATCH 10/13] Update test.js --- test.js | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/test.js b/test.js index d425769..e4df1a1 100644 --- a/test.js +++ b/test.js @@ -1784,7 +1784,7 @@ test('download can be destroyed', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(14) + t.plan(16) const { corestore, drive, swarm, mirror } = await testenv(t) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1821,10 +1821,35 @@ test('upload/download can be monitored', async (t) => { downloadMonitor.downloadStats.blocks === downloadMonitor.downloadStats.targetBlocks && downloadMonitor.downloadStats.percentage === 100 + let uploadUpdates = 0 + let downloadUpdates = 0 + + // if upload / download events were emmitted before they were complete + let uploadSawIntermediate = false + let downloadSawIntermediate = false + + function onUploadUpdate() { + uploadUpdates++ + if (uploadMonitor.uploadStats.percentage !== 100) uploadSawIntermediate = true + } + + function onDownloadUpdate() { + downloadUpdates++ + if (downloadMonitor.downloadStats.percentage !== 100) downloadSawIntermediate = true + } + + uploadMonitor.on('update', onUploadUpdate) + downloadMonitor.on('update', onDownloadUpdate) + const getting = mirror.drive.get(file) - const sawUpload = await waitForEvent(uploadMonitor, 'update', uploadDone) - const sawDownload = await waitForEvent(downloadMonitor, 'update', downloadDone) + const sawUpload = (async () => { + if (!uploadDone()) await waitForEvent(uploadMonitor, 'update', uploadDone) + })() + + const sawDownload = (async () => { + if (!downloadDone()) await waitForEvent(downloadMonitor, 'update', downloadDone) + })() await Promise.all([getting, sawUpload, sawDownload]) @@ -1836,6 +1861,17 @@ test('upload/download can be monitored', async (t) => { t.is(downloadMonitor.downloadStats.percentage, 100) t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) + t.ok( + !uploadSawIntermediate || uploadUpdates >= 2, + 'upload intermediate update should be followed by final update' + ) + t.ok( + !downloadSawIntermediate || downloadUpdates >= 2, + 'download intermediate update should be followed by final update' + ) + + uploadMonitor.removeListener('update', onUploadUpdate) + downloadMonitor.removeListener('update', onDownloadUpdate) await uploadMonitor.close() await downloadMonitor.close() From be9b83858c7eb50fb19f6cf35746059d074534c0 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Wed, 8 Apr 2026 17:26:40 -0400 Subject: [PATCH 11/13] Update test.js --- test.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test.js b/test.js index e4df1a1..1b209f8 100644 --- a/test.js +++ b/test.js @@ -1030,7 +1030,7 @@ test('drive.clear(path)', async (t) => { t.is(nowContent, null) }) -test('drive.clear(path) with diff', async (t) => { +test.skip('drive.clear(path) with diff', async (t) => { const storage = await getTmpDir(t) const a = new Hyperdrive(new Corestore(storage)) @@ -1082,7 +1082,7 @@ test('drive.clear(path) on a checkout', async (t) => { t.is(nowContent, null) }) -test('drive.clearAll() with diff', async (t) => { +test.skip('drive.clearAll() with diff', async (t) => { t.plan(9) const storage = await getTmpDir(t) @@ -1844,11 +1844,11 @@ test('upload/download can be monitored', async (t) => { const getting = mirror.drive.get(file) const sawUpload = (async () => { - if (!uploadDone()) await waitForEvent(uploadMonitor, 'update', uploadDone) + await waitForEvent(uploadMonitor, 'update', uploadDone) })() const sawDownload = (async () => { - if (!downloadDone()) await waitForEvent(downloadMonitor, 'update', downloadDone) + await waitForEvent(downloadMonitor, 'update', downloadDone) })() await Promise.all([getting, sawUpload, sawDownload]) @@ -2117,7 +2117,7 @@ function replicateDebugStream(t, a, b, opts = {}) { return [s1, s2] } -async function ensureDbLength(drive, length, timeout = 20000) { +async function ensureDbLength(drive, length) { while (drive.db.core.length < length) await once(drive.db.core, 'append') } From 4a5699c1a06d44179f5f2f1d3c0a9e24831f9551 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 9 Apr 2026 12:43:57 -0400 Subject: [PATCH 12/13] Update test.js --- test.js | 38 +++----------------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/test.js b/test.js index 1b209f8..4020d27 100644 --- a/test.js +++ b/test.js @@ -1811,47 +1811,21 @@ test('upload/download can be monitored', async (t) => { t.is(downloadMonitor.downloadStats.targetBytes, bytes) t.ok(downloadMonitor.downloadStats.targetBlocks > 0) - const uploadDone = () => - uploadMonitor.uploadStats.monitoringBytes === bytes && - uploadMonitor.uploadStats.blocks === uploadMonitor.uploadStats.targetBlocks && - uploadMonitor.uploadStats.percentage === 100 - - const downloadDone = () => - downloadMonitor.downloadStats.monitoringBytes === bytes && - downloadMonitor.downloadStats.blocks === downloadMonitor.downloadStats.targetBlocks && - downloadMonitor.downloadStats.percentage === 100 - let uploadUpdates = 0 let downloadUpdates = 0 - // if upload / download events were emmitted before they were complete - let uploadSawIntermediate = false - let downloadSawIntermediate = false - function onUploadUpdate() { uploadUpdates++ - if (uploadMonitor.uploadStats.percentage !== 100) uploadSawIntermediate = true } function onDownloadUpdate() { downloadUpdates++ - if (downloadMonitor.downloadStats.percentage !== 100) downloadSawIntermediate = true } uploadMonitor.on('update', onUploadUpdate) downloadMonitor.on('update', onDownloadUpdate) - const getting = mirror.drive.get(file) - - const sawUpload = (async () => { - await waitForEvent(uploadMonitor, 'update', uploadDone) - })() - - const sawDownload = (async () => { - await waitForEvent(downloadMonitor, 'update', downloadDone) - })() - - await Promise.all([getting, sawUpload, sawDownload]) + await mirror.drive.get(file) t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) @@ -1861,14 +1835,8 @@ test('upload/download can be monitored', async (t) => { t.is(downloadMonitor.downloadStats.percentage, 100) t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) - t.ok( - !uploadSawIntermediate || uploadUpdates >= 2, - 'upload intermediate update should be followed by final update' - ) - t.ok( - !downloadSawIntermediate || downloadUpdates >= 2, - 'download intermediate update should be followed by final update' - ) + t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') + t.ok(downloadUpdates >= 2, 'download should emit multiple update events') uploadMonitor.removeListener('update', onUploadUpdate) downloadMonitor.removeListener('update', onDownloadUpdate) From b116cbd336a970781c09b36998c134a26504c27f Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 9 Apr 2026 16:05:15 -0400 Subject: [PATCH 13/13] Update test.js --- test.js | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/test.js b/test.js index 4020d27..43b0229 100644 --- a/test.js +++ b/test.js @@ -1533,28 +1533,27 @@ test('getBlobsLength large db - prefetch', async (t) => { const a = new Hyperdrive(store.session()) t.teardown(() => a.close()) - for (let i = 0; i < 1_000; i++) { + const num = 1_000 + for (let i = 0; i < num; i++) { await a.put('./file' + i, 'here') } const store2 = new Corestore(await t.tmp()) t.teardown(() => store2.close()) + const b = new Hyperdrive(store2.session(), a.key) t.teardown(() => b.close()) + const gotAppend = once(b.core, 'append') replicateDebugStream(t, a, b, { latency: 10 }) + await gotAppend - const start = Date.now() - const targetVersion = a.version - - await b.checkout(targetVersion).db.core.get(targetVersion - 1, { timeout: 20000 }) - - const bBlobsLength = await b.getBlobsLength() - const end = Date.now() - - t.is(bBlobsLength, await a.getBlobsLength(), 'blob lengths match') - - t.comment('getBlobsLength() time in secs ' + (end - start) / 1000) + t.is(await b.getBlobsLength(), await a.getBlobsLength(), 'blob lengths match') + t.comment('wireRequest sent', b.core.replicator.stats.wireRequest.tx) + t.ok( + b.core.replicator.stats.wireRequest.tx < 1.1 * num, + 'synced within a reasonable amount of requests' + ) }) test('truncate happy path', async (t) => {