diff --git a/test.js b/test.js index 7ca8e32..43b0229 100644 --- a/test.js +++ b/test.js @@ -95,7 +95,7 @@ test('drive.get(path, { wait: false }) throws if entry exists but not found', as s1.pipe(s2).pipe(s1) await drive.put('/file', 'content') - await eventFlush() + await ensureDbLength(otherDrive, drive.version) await otherDrive.entry('/file') // Ensure in bee @@ -381,24 +381,26 @@ test('watch() basic', async function (t) { const buf = b4a.from('hi') const watcher = drive.watch() + await watcher.ready() + const baseVersion = drive.version - eventFlush().then(async () => { - await drive.put('/a.txt', buf) - }) + const next = watcher.next() + await drive.put('/a.txt', buf) - for await (const [current, previous] of watcher) { - // eslint-disable-line no-unreachable-loop - t.ok(current instanceof Hyperdrive) - t.ok(previous instanceof Hyperdrive) - t.is(current.version, 2) - t.is(previous.version, 1) - t.alike(await current.get('/a.txt'), buf) - break - } + const { value } = await next + const [current, previous] = value + + t.ok(current instanceof Hyperdrive) + t.ok(previous instanceof Hyperdrive) + t.is(current.version, baseVersion + 1) + t.is(previous.version, baseVersion) + t.alike(await current.get('/a.txt'), buf) + + await watcher.destroy() }) test('watch(folder) basic', async function (t) { - t.plan(1) + t.plan(3) const { drive } = await testenv(t) const buf = b4a.from('hi') @@ -408,49 +410,43 @@ test('watch(folder) basic', async function (t) { await drive.put('/examples/more/a.txt', buf) const watcher = drive.watch('/examples') + await watcher.ready() - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') + const prevVersion = drive.version await drive.put('/b.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') + const next = watcher.next() await drive.put('/examples/b.txt', buf) - await eventFlush() - onchange = null + + const { value } = await next + const [current, previous] = value + + t.ok(previous.version !== prevVersion) + t.is(await previous.get('/examples/b.txt'), null) + t.alike(await current.get('/examples/b.txt'), buf) + + await watcher.destroy() }) test('watch(folder) should normalize folder', async function (t) { - t.plan(1) + t.plan(2) const { drive } = await testenv(t) const buf = b4a.from('hi') const watcher = drive.watch('examples//more//') + await watcher.ready() - let next = watcher.next() - let onchange = null - next.then((data) => { - next = watcher.next() - onchange(data) - }) - - onchange = () => t.fail('should not trigger changes') await drive.put('/examples/a.txt', buf) - await eventFlush() - onchange = null - - onchange = () => t.pass('change') + const next = watcher.next() await drive.put('/examples/more/a.txt', buf) - await eventFlush() - onchange = null + + const { value } = await next + const [current, previous] = value + + t.is(await previous.get('/examples/more/a.txt'), null) + t.alike(await current.get('/examples/more/a.txt'), buf) + + await watcher.destroy() }) test('drive.diff(length)', async (t) => { @@ -688,7 +684,7 @@ test('drive.download(folder, [options])', async (t) => { await drive.put('/parent/child/grandchild1', nil) await drive.put('/parent/child/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) const blobs = await mirror.drive.getBlobs() @@ -731,7 +727,7 @@ test('drive.download(filename, [options])', async (t) => { await drive.put('/file', nil) await drive.put('/parent/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) await mirror.drive.getBlobs() const download = mirror.drive.download('/file') @@ -836,7 +832,7 @@ test('drive.has(path)', async (t) => { await drive.put('/parent/child/grandchild1', nil) await drive.put('/parent/child/grandchild2', nil) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) t.absent(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/child/grandchild2')) @@ -844,19 +840,17 @@ test('drive.has(path)', async (t) => { t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') await drive.put('/parent/sibling/grandchild1', nil) + await ensureDbLength(mirror.drive, drive.version) const downloadChild = mirror.drive.download('/parent/child/') await downloadChild.done() - await eventFlush() - t.ok(await mirror.drive.has('/parent/child/')) t.absent(await mirror.drive.has('/parent/')) const downloadSibling = mirror.drive.download('/parent/sibling/') await downloadSibling.done() - await eventFlush() t.ok(await mirror.drive.has('/parent/')) t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) }) @@ -962,9 +956,10 @@ test('drive.close() with openBlobsFromHeader waiting in the background', async ( t.ok(drive.corestore.closed) }) -test.skip('drive.findingPeers()', async (t) => { +test('drive.findingPeers()', async (t) => { + t.plan(2) const { drive, corestore, swarm, mirror } = await testenv(t) - await drive.put('/', b4a.from('/')) + await drive.put('/a', b4a.from('a')) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -972,9 +967,17 @@ test.skip('drive.findingPeers()', async (t) => { mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + const done = mirror.drive.findingPeers() - swarm.flush().then(done, done) - t.ok(await mirror.drive.get('/')) + const updating = mirror.drive.update({ wait: true }) + try { + await Promise.all([waitForEvent(mirror.swarm, 'connection'), mirror.swarm.flush()]) + } finally { + done() + } + + t.ok(await updating) + t.alike(await mirror.drive.get('/a'), b4a.from('a')) }) test('drive.mirror()', async (t) => { @@ -1031,14 +1034,23 @@ test.skip('drive.clear(path) with diff', async (t) => { const storage = await getTmpDir(t) const a = new Hyperdrive(new Corestore(storage)) + t.teardown(() => a.close()) await a.put('/file', b4a.alloc(4 * 1024)) + const init = await a.get('/file', { wait: false }) + t.ok(init) + const key = a.key await a.close() - const b = new Hyperdrive(new Corestore(storage)) + const b = new Hyperdrive(new Corestore(storage), key) + t.teardown(() => b.close()) + await b.getBlobs() const cleared = await b.clear('/file', { diff: true }) t.ok(cleared.blocks > 0) + t.ok(await b.entry('/file')) + await t.exception(() => b.get('/file', { wait: false }), /BLOCK_NOT_AVAILABLE/) + const cleared2 = await b.clear('/file', { diff: true }) t.is(cleared2.blocks, 0) @@ -1071,19 +1083,29 @@ test('drive.clear(path) on a checkout', async (t) => { }) test.skip('drive.clearAll() with diff', async (t) => { + t.plan(9) const storage = await getTmpDir(t) const a = new Hyperdrive(new Corestore(storage)) + t.teardown(() => a.close()) await a.put('/file-1', b4a.alloc(4 * 1024)) await a.put('/file-2', b4a.alloc(8 * 1024)) await a.put('/file-3', b4a.alloc(16 * 1024)) + const key = a.key await a.close() - const b = new Hyperdrive(new Corestore(storage)) + const b = new Hyperdrive(new Corestore(storage), key) + t.teardown(() => b.close()) + await b.getBlobs() const cleared = await b.clearAll({ diff: true }) t.ok(cleared.blocks > 0) + for (const path of ['/file-1', '/file-2', '/file-3']) { + t.ok(await b.entry(path)) + await t.exception(() => b.get(path, { wait: false }), /BLOCK_NOT_AVAILABLE/) + } + const cleared2 = await b.clearAll({ diff: true }) t.is(cleared2.blocks, 0) @@ -1511,28 +1533,27 @@ test('getBlobsLength large db - prefetch', async (t) => { const a = new Hyperdrive(store.session()) t.teardown(() => a.close()) - for (let i = 0; i < 1_000; i++) { + const num = 1_000 + for (let i = 0; i < num; i++) { await a.put('./file' + i, 'here') } const store2 = new Corestore(await t.tmp()) t.teardown(() => store2.close()) + const b = new Hyperdrive(store2.session(), a.key) t.teardown(() => b.close()) - const start = Date.now() - const gotAppend = once(b.core, 'append') replicateDebugStream(t, a, b, { latency: 10 }) await gotAppend - const bBlobsLength = await b.getBlobsLength() - const end = Date.now() - - t.is(bBlobsLength, await a.getBlobsLength(), 'blob lengths match') - - t.comment('getBlobsLength() time in secs ' + (end - start) / 1000) - t.ok(end - start < 2_000, 'synced in a reasonable time') + t.is(await b.getBlobsLength(), await a.getBlobsLength(), 'blob lengths match') + t.comment('wireRequest sent', b.core.replicator.stats.wireRequest.tx) + t.ok( + b.core.replicator.stats.wireRequest.tx < 1.1 * num, + 'synced within a reasonable amount of requests' + ) }) test('truncate happy path', async (t) => { @@ -1748,20 +1769,21 @@ test('download can be destroyed', async (t) => { await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) + const blobs = await mirror.drive.getBlobs() const download = mirror.drive.download('/file') + await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') download.destroy() // not needed, just for test timing await download.close() - t.ok(mirror.drive.blobs.core.contiguousLength < mirror.drive.blobs.core.length) + t.ok(blobs.core.contiguousLength < blobs.core.length) }) -// VERY TIMING DEPENDENT, NEEDS FIX -test.skip('upload/download can be monitored', async (t) => { - t.plan(27) +test('upload/download can be monitored', async (t) => { + t.plan(16) const { corestore, drive, swarm, mirror } = await testenv(t) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1772,46 +1794,55 @@ test.skip('upload/download can be monitored', async (t) => { await mirror.swarm.flush() const file = '/example.md' - const bytes = 1024 * 100 // big enough to trigger more than one update event + const bytes = 1024 * 100 const buffer = Buffer.alloc(bytes, '0') await drive.put(file, buffer) + await ensureDbLength(mirror.drive, drive.version) - { - // Start monitoring upload - const monitor = drive.monitor(file) - await monitor.ready() - t.is(monitor.name, file) - const expectedBlocks = [2, 1] - const expectedBytes = [bytes, 65536] - monitor.on('update', () => { - t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) - t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.uploadStats.targetBlocks, 2) - t.is(monitor.uploadStats.targetBytes, bytes) - t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) - if (!expectedBlocks.length) t.is(monitor.uploadStats.percentage, 100) - t.absent(monitor.downloadStats.blocks) - }) + const uploadMonitor = drive.monitor(file) + await uploadMonitor.ready() + t.is(uploadMonitor.name, file) + t.is(uploadMonitor.uploadStats.targetBytes, bytes) + t.ok(uploadMonitor.uploadStats.targetBlocks > 0) + + const downloadMonitor = mirror.drive.monitor(file) + await downloadMonitor.ready() + t.is(downloadMonitor.downloadStats.targetBytes, bytes) + t.ok(downloadMonitor.downloadStats.targetBlocks > 0) + + let uploadUpdates = 0 + let downloadUpdates = 0 + + function onUploadUpdate() { + uploadUpdates++ } - { - // Start monitoring download - const monitor = mirror.drive.monitor(file) - await monitor.ready() - const expectedBlocks = [2, 1] - const expectedBytes = [bytes, 65536] - monitor.on('update', () => { - t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) - t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.downloadStats.targetBlocks, 2) - t.is(monitor.downloadStats.targetBytes, bytes) - t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) - if (!expectedBlocks.length) t.is(monitor.downloadStats.percentage, 100) - t.absent(monitor.uploadStats.blocks) - }) + function onDownloadUpdate() { + downloadUpdates++ } + uploadMonitor.on('update', onUploadUpdate) + downloadMonitor.on('update', onDownloadUpdate) + await mirror.drive.get(file) + + t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) + t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) + t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) + t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) + t.is(uploadMonitor.uploadStats.percentage, 100) + t.is(downloadMonitor.downloadStats.percentage, 100) + t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) + t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) + t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') + t.ok(downloadUpdates >= 2, 'download should emit multiple update events') + + uploadMonitor.removeListener('update', onUploadUpdate) + downloadMonitor.removeListener('update', onDownloadUpdate) + + await uploadMonitor.close() + await downloadMonitor.close() + t.pass('monitors closed') }) test('monitor is removed from the Set on close', async (t) => { @@ -1837,7 +1868,7 @@ test('monitor range download', async (t) => { await drive.put('/file-b', Buffer.alloc(1024)) await drive.put('/file-c', Buffer.alloc(1024)) - await eventFlush() + await ensureDbLength(mirror.drive, drive.version) const monitor = mirror.drive.monitor('download-monitor') await monitor.ready() @@ -2008,10 +2039,6 @@ async function streamToBuffer(stream) { return b4a.concat(chunks) } -function eventFlush() { - return new Promise((resolve) => setTimeout(resolve, 1000)) -} - async function replicate(drive, swarm, mirror) { swarm.on('connection', (conn) => drive.corestore.replicate(conn)) const discovery = swarm.join(drive.discoveryKey, { @@ -2058,5 +2085,44 @@ function replicateDebugStream(t, a, b, opts = {}) { } async function ensureDbLength(drive, length) { - await drive.checkout(length).db.core.get(length - 1) + while (drive.db.core.length < length) await once(drive.db.core, 'append') +} + +async function waitForAppendIfEmpty(core, message, timeout = 20000) { + if (core.length !== 0) return + await waitForEvent(core, 'append', () => core.length !== 0, timeout, message) +} + +async function waitForEvent( + emitter, + event, + predicate = null, + timeout = 20000, + message = `Timed out waiting for ${event}` +) { + await new Promise((resolve, reject) => { + let timer = null + + function cleanup() { + if (timer) clearTimeout(timer) + emitter.removeListener(event, onevent) + } + + function onevent(...args) { + if (predicate && !predicate(...args)) return + cleanup() + resolve() + } + + emitter.on(event, onevent) + timer = setTimeout(() => { + cleanup() + reject(new Error(message)) + }, timeout) + + if (predicate && predicate()) { + cleanup() + resolve() + } + }) }