From 900c530cdf8038d5a1c130bb52b60e023b759665 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Mon, 13 Apr 2026 10:14:30 -0400 Subject: [PATCH 1/4] refactor --- package.json | 7 +- test/e2e.js | 473 +++++++++++++++++++++++ test/helpers.js | 226 +++++++++++ test/integration.js | 110 ++++++ test.js => test/unit.js | 822 +++------------------------------------- 5 files changed, 866 insertions(+), 772 deletions(-) create mode 100644 test/e2e.js create mode 100644 test/helpers.js create mode 100644 test/integration.js rename test.js => test/unit.js (61%) diff --git a/package.json b/package.json index 39a35da..c1a44c8 100644 --- a/package.json +++ b/package.json @@ -15,8 +15,11 @@ "format": "prettier --write .", "lint": "prettier --check . && lunte", "test": "npm run test:node && npm run test:bare", - "test:bare": "brittle-bare --coverage test.js", - "test:node": "brittle-node --coverage test.js" + "test:bare": "brittle-bare --coverage test/unit.js test/integration.js test/e2e.js", + "test:node": "brittle-node --coverage test/unit.js test/integration.js test/e2e.js", + "test:unit": "brittle-node test/unit.js", + "test:integration": "brittle-node test/integration.js", + "test:e2e": "brittle-node test/e2e.js" }, "author": "Holepunch", "license": "Apache-2.0", diff --git a/test/e2e.js b/test/e2e.js new file mode 100644 index 0000000..658e205 --- /dev/null +++ b/test/e2e.js @@ -0,0 +1,473 @@ +const test = require('brittle') +const b4a = require('b4a') +const { + e2eTestenv: testenv, + replicate, + ensureDbLength, + downloadShark, + waitForAppendIfEmpty, + waitForEvent +} = require('./helpers.js') + +test('drive.download(folder, [options])', async (t) => { + t.plan(7) + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const nil = b4a.from('nil') + + let count = 0 + let max = -Infinity + + await drive.put('/parent/child/grandchild1', nil) + await drive.put('/parent/child/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + const blobs = await mirror.drive.getBlobs() + + blobs.core.on('download', (offset) => { + count++ + if (max < offset) max = offset + }) + + const l = drive.blobs.core.length + + await drive.put('/parent/sibling/grandchild1', nil) + + t.is(count, 0) + const download = mirror.drive.download('/parent/child') + await download.done() + t.is(max, l - 1) + const _count = count + t.ok(await mirror.drive.get('/parent/child/grandchild1')) + t.is(_count, count) + t.ok(await mirror.drive.get('/parent/child/grandchild2')) + t.is(_count, count) + const entry = await mirror.drive.entry('/parent/sibling/grandchild1') + await blobs.get(entry.value.blob) + t.is(count, _count + 1) +}) + +test('drive.download(filename, [options])', async (t) => { + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const nil = b4a.from('nil') + + await drive.put('/parent/grandchild1', nil) + await drive.put('/file', nil) + await drive.put('/parent/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + await mirror.drive.getBlobs() + const download = mirror.drive.download('/file') + await download.done() + + t.ok(await mirror.drive.get('/file', { wait: false })) + + try { + await mirror.drive.get('/file1', { wait: false }) + } catch { + t.pass('not downloaded') + } +}) + +test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + await drive.put('/file-a', Buffer.alloc(1024)) + await drive.put('/file-b', Buffer.alloc(1024)) + await drive.put('/file-c', Buffer.alloc(1024)) + + while (mirror.drive.version < drive.version) { + await new Promise((resolve) => setTimeout(resolve, 100)) + } + + const blobCore = (await mirror.drive.getBlobs()).core + + const fileTelem = downloadShark(mirror.drive.core) + const blobTelem = downloadShark(blobCore) + + const download = await mirror.drive.downloadRange( + [ + { start: 1, end: 2 }, + { start: 2, end: 3 } + ], + [{ start: 0, end: 3 }] + ) + await download.done() + + t.is(fileTelem.count, 2) + t.is(blobTelem.count, 3) +}) + +test('drive.downloadDiff(version, folder, [options])', async (t) => { + const { drive, swarm, mirror, corestore } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const nil = b4a.from('nil') + const version = drive.version + + await drive.put('/parent/child/0', nil) + await drive.put('/parent/sibling/0', nil) + await drive.put('/parent/child/1', nil) + + while (mirror.drive.version < drive.version) { + await new Promise((resolve) => setTimeout(resolve, 100)) + } + + const blobCore = (await mirror.drive.getBlobs()).core + + const filestelem = downloadShark(mirror.drive.core) + const blobstelem = downloadShark(blobCore) + + const downloadDiff = await mirror.drive.downloadDiff(version, '/parent/child') + await downloadDiff.done() + + const filescount = filestelem.count + const blobscount = blobstelem.count + + await mirror.drive.get('/parent/child/1') + + t.is(filescount, filestelem.count) + t.is(blobscount, blobstelem.count) +}) + +test('drive.has(path)', async (t) => { + t.plan(8) + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const nil = b4a.from('nil') + + await drive.put('/parent/child/grandchild1', nil) + await drive.put('/parent/child/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + t.absent(await mirror.drive.has('/parent/child/')) + t.absent(await mirror.drive.has('/parent/child/grandchild2')) + t.absent(await mirror.drive.has('/non-existent.txt'), 'returns false for non-existent files') + t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') + + await drive.put('/parent/sibling/grandchild1', nil) + await ensureDbLength(mirror.drive, drive.version) + + const downloadChild = mirror.drive.download('/parent/child/') + await downloadChild.done() + + t.ok(await mirror.drive.has('/parent/child/')) + t.absent(await mirror.drive.has('/parent/')) + + const downloadSibling = mirror.drive.download('/parent/sibling/') + await downloadSibling.done() + + t.ok(await mirror.drive.has('/parent/')) + t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) +}) + +test('drive.findingPeers()', async (t) => { + t.plan(2) + const { drive, corestore, swarm, mirror } = await testenv(t) + await drive.put('/a', b4a.from('a')) + + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + + const done = mirror.drive.findingPeers() + const updating = mirror.drive.update({ wait: true }) + try { + await Promise.all([waitForEvent(mirror.swarm, 'connection'), mirror.swarm.flush()]) + } finally { + done() + } + + t.ok(await updating) + t.alike(await mirror.drive.get('/a'), b4a.from('a')) +}) + +test('drive.entry(key, { timeout })', async (t) => { + t.plan(1) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + await swarm.destroy() + await drive.close() + + try { + await mirror.drive.entry('/file.txt', { timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('drive.entry(key, { wait })', async (t) => { + t.plan(1) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + await swarm.destroy() + await drive.close() + + try { + await mirror.drive.entry('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive.get(key, { timeout })', async (t) => { + t.plan(3) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + await ensureDbLength(mirror.drive, drive.version) + + const entry = await mirror.drive.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + + await swarm.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('drive.get(key, { wait }) with entry but no blob', async (t) => { + t.plan(3) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + const mirrorCheckout = mirror.drive.checkout(2) + const entry = await mirrorCheckout.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + await mirrorCheckout.close() + + await swarm.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive.get(key, { wait }) without entry', async (t) => { + t.plan(1) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + await swarm.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive peek with get() and timeout', async (t) => { + t.plan(3) + + const { drive, swarm, mirror } = await testenv(t) + await replicate(drive, swarm, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await ensureDbLength(mirror.drive, drive.version) + + const entry = await mirror.drive.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + + try { + await mirror.drive.get('/file.txt', { start: 100, timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('download can be destroyed', async (t) => { + t.plan(1) + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) + + await ensureDbLength(mirror.drive, drive.version) + const blobs = await mirror.drive.getBlobs() + + const download = mirror.drive.download('/file') + await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') + download.destroy() + + // not needed, just for test timing + await download.close() + + t.ok(blobs.core.contiguousLength < blobs.core.length) +}) + +test('upload/download can be monitored', async (t) => { + t.plan(16) + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const file = '/example.md' + const bytes = 1024 * 100 + const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) + await ensureDbLength(mirror.drive, drive.version) + + const uploadMonitor = drive.monitor(file) + await uploadMonitor.ready() + t.is(uploadMonitor.name, file) + t.is(uploadMonitor.uploadStats.targetBytes, bytes) + t.ok(uploadMonitor.uploadStats.targetBlocks > 0) + + const downloadMonitor = mirror.drive.monitor(file) + await downloadMonitor.ready() + t.is(downloadMonitor.downloadStats.targetBytes, bytes) + t.ok(downloadMonitor.downloadStats.targetBlocks > 0) + + let uploadUpdates = 0 + let downloadUpdates = 0 + + function onUploadUpdate() { + uploadUpdates++ + } + + function onDownloadUpdate() { + downloadUpdates++ + } + + uploadMonitor.on('update', onUploadUpdate) + downloadMonitor.on('update', onDownloadUpdate) + + await mirror.drive.get(file) + + t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) + t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) + t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) + t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) + t.is(uploadMonitor.uploadStats.percentage, 100) + t.is(downloadMonitor.downloadStats.percentage, 100) + t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) + t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) + t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') + t.ok(downloadUpdates >= 2, 'download should emit multiple update events') + + uploadMonitor.removeListener('update', onUploadUpdate) + downloadMonitor.removeListener('update', onDownloadUpdate) + + await uploadMonitor.close() + await downloadMonitor.close() + t.pass('monitors closed') +}) + +test('monitor range download', async (t) => { + const { corestore, drive, swarm, mirror } = await testenv(t) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + await drive.put('/file-a', Buffer.alloc(1024)) + await drive.put('/file-b', Buffer.alloc(1024)) + await drive.put('/file-c', Buffer.alloc(1024)) + + await ensureDbLength(mirror.drive, drive.version) + + const monitor = mirror.drive.monitor('download-monitor') + await monitor.ready() + + const download = await mirror.drive.downloadRange([], [{ start: 0, end: 3 }]) + await download.done() + + t.is(monitor.downloadStats.peers, 1) + t.ok(monitor.downloadStats.speed > 0) + t.ok(monitor.downloadStats.blocks > 0) + t.ok(monitor.downloadStats.totalBytes, 3072) +}) diff --git a/test/helpers.js b/test/helpers.js new file mode 100644 index 0000000..11dd5ab --- /dev/null +++ b/test/helpers.js @@ -0,0 +1,226 @@ +const fs = require('fs') +const path = require('path') +const { once } = require('events') +const Corestore = require('corestore') +const Hyperdrive = require('../index.js') +const testnet = require('hyperdht/testnet') +const DHT = require('hyperdht') +const Hyperswarm = require('hyperswarm') +const b4a = require('b4a') +const getTmpDir = require('test-tmp') +const DebuggingStream = require('debugging-stream') + +const pkgRoot = path.join(__dirname, '..') +const sampleFile = path.join(pkgRoot, 'index.js') + +async function localTestenv(t) { + const { teardown } = t + + const corestore = new Corestore(await t.tmp()) + await corestore.ready() + + const drive = new Hyperdrive(corestore) + await drive.ready() + teardown(drive.close.bind(drive)) + + const tmp = await getTmpDir(t) + const paths = { tmp, root: pkgRoot } + + return { corestore, drive, paths } +} + +async function localTestenvWithMirror(t) { + const base = await localTestenv(t) + + const mirrorCorestore = new Corestore(await t.tmp()) + await mirrorCorestore.ready() + t.teardown(mirrorCorestore.close.bind(mirrorCorestore)) + + const mirrorDrive = new Hyperdrive(mirrorCorestore, base.drive.key) + await mirrorDrive.ready() + t.teardown(mirrorDrive.close.bind(mirrorDrive)) + + return { + ...base, + mirror: { + corestore: mirrorCorestore, + drive: mirrorDrive + } + } +} + +async function e2eTestenv(t) { + const { teardown } = t + + const corestore = new Corestore(await t.tmp()) + await corestore.ready() + + const drive = new Hyperdrive(corestore) + await drive.ready() + teardown(drive.close.bind(drive)) + + const net = await testnet(2, { teardown }) + const { bootstrap } = net + const swarm = new Hyperswarm({ dht: new DHT({ bootstrap }) }) + teardown(swarm.destroy.bind(swarm)) + + const mirror = {} + mirror.swarm = new Hyperswarm({ dht: new DHT({ bootstrap }) }) + teardown(mirror.swarm.destroy.bind(mirror.swarm)) + mirror.corestore = new Corestore(await t.tmp()) + mirror.drive = new Hyperdrive(mirror.corestore, drive.key) + await mirror.drive.ready() + teardown(mirror.drive.close.bind(mirror.drive)) + + const tmp = await getTmpDir(t) + const paths = { tmp, root: pkgRoot } + + return { net, paths, corestore, drive, swarm, mirror } +} + +async function* readdirator( + parent, + { + readdir = fs.readdirSync, + isDirectory = (x) => fs.statSync(x).isDirectory(), + filter = () => true + } = {} +) { + for await (const child of readdir(parent)) { + const next = path.join(parent, child) + try { + if (!filter(child)) continue + if (await isDirectory(next)) yield* readdirator(next) + else yield next + } catch { + continue + } + } +} + +function filter(x) { + return !/node_modules|\.git/.test(x) +} + +function downloadShark(core) { + const telem = { offsets: [], count: 0 } + core.on('download', (offset) => { + telem.count++ + telem.offsets.push(offset) + }) + return telem +} + +async function streamToBuffer(stream) { + const chunks = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + return b4a.concat(chunks) +} + +async function replicate(drive, swarm, mirror) { + swarm.on('connection', (conn) => drive.corestore.replicate(conn)) + const discovery = swarm.join(drive.discoveryKey, { + server: true, + client: false + }) + await discovery.flushed() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() +} + +function replicateDebugStream(t, a, b, opts = {}) { + const { latency, speed, jitter } = opts + + const s1 = a.replicate(true, { keepAlive: false, ...opts }) + const s2Base = b.replicate(false, { keepAlive: false, ...opts }) + const s2 = new DebuggingStream(s2Base, { latency, speed, jitter }) + + s1.on('error', (err) => t.comment(`replication stream error (initiator): ${err}`)) + s2.on('error', (err) => t.comment(`replication stream error (responder): ${err}`)) + + if (opts.teardown !== false) { + t.teardown(async function () { + let missing = 2 + await new Promise((resolve) => { + s1.on('close', onclose) + s1.destroy() + + s2.on('close', onclose) + s2.destroy() + + function onclose() { + if (--missing === 0) resolve() + } + }) + }) + } + + s1.pipe(s2).pipe(s1) + + return [s1, s2] +} + +async function ensureDbLength(drive, length) { + while (drive.db.core.length < length) await once(drive.db.core, 'append') +} + +async function waitForAppendIfEmpty(core, message, timeout = 20000) { + if (core.length !== 0) return + await waitForEvent(core, 'append', () => core.length !== 0, timeout, message) +} + +async function waitForEvent( + emitter, + event, + predicate = null, + timeout = 20000, + message = `Timed out waiting for ${event}` +) { + await new Promise((resolve, reject) => { + let timer = null + + function cleanup() { + if (timer) clearTimeout(timer) + emitter.removeListener(event, onevent) + } + + function onevent(...args) { + if (predicate && !predicate(...args)) return + cleanup() + resolve() + } + + emitter.on(event, onevent) + timer = setTimeout(() => { + cleanup() + reject(new Error(message)) + }, timeout) + + if (predicate && predicate()) { + cleanup() + resolve() + } + }) +} + +module.exports = { + pkgRoot, + sampleFile, + localTestenv, + localTestenvWithMirror, + e2eTestenv, + readdirator, + filter, + downloadShark, + streamToBuffer, + replicate, + replicateDebugStream, + ensureDbLength, + waitForAppendIfEmpty, + waitForEvent, + Hyperdrive +} diff --git a/test/integration.js b/test/integration.js new file mode 100644 index 0000000..77ce39e --- /dev/null +++ b/test/integration.js @@ -0,0 +1,110 @@ +const fs = require('fs') +const { once } = require('events') +const test = require('brittle') +const Corestore = require('corestore') +const b4a = require('b4a') +const Hyperdrive = require('../index.js') +const { + localTestenv: testenv, + localTestenvWithMirror: testenvWithMirror, + replicateDebugStream, + ensureDbLength, + sampleFile +} = require('./helpers.js') + +test('Hyperdrive(corestore, key)', async (t) => { + t.plan(2) + const { corestore, drive } = await testenv(t) + const diskbuf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, diskbuf) + const bndlbuf = await drive.get(sampleFile) + t.is(b4a.compare(diskbuf, bndlbuf), 0) + const mirror = new Hyperdrive(corestore.session({ writable: false }), drive.core.key) + await mirror.ready() + const mrrrbuf = await mirror.get(sampleFile) + t.is(b4a.compare(bndlbuf, mrrrbuf), 0) +}) + +test('drive.get(path, { wait: false }) throws if entry exists but not found', async (t) => { + const { drive, mirror } = await testenvWithMirror(t) + + const otherDrive = mirror.drive + const s1 = drive.corestore.replicate(true) + const s2 = otherDrive.corestore.replicate(false) + s1.pipe(s2).pipe(s1) + + await drive.put('/file', 'content') + await ensureDbLength(otherDrive, drive.version) + + await otherDrive.entry('/file') // Ensure in bee + + await t.exception(() => otherDrive.get('/file', { wait: false }), /BLOCK_NOT_AVAILABLE/) + t.is( + b4a.toString(await otherDrive.get('/file')), + 'content', + 'sanity check: can actually get content' + ) +}) + +test('drive.mirror()', async (t) => { + const { drive: a } = await testenv(t) + const { drive: b } = await testenv(t) + + await a.put('/file.txt', 'hello world') + await a.mirror(b).done() + + t.alike(await b.get('/file.txt'), b4a.from('hello world')) +}) + +test('basic writable option', async function (t) { + t.plan(3) + + const store = new Corestore(await t.tmp()) + + const a = new Hyperdrive(store) + await a.put('/file-one', 'hi') + + const b = new Hyperdrive(store.session({ writable: false }), a.key) + await b.ready() + t.is(b.writable, false) + t.is(b.blobs.core.writable, false) + + try { + await b.put('/file-two', 'hi') + t.fail('Should have failed') + } catch (err) { + t.is(err.code, 'SESSION_NOT_WRITABLE') + } + + await a.close() + await b.close() +}) + +test('getBlobsLength large db - prefetch', async (t) => { + const store = new Corestore(await t.tmp()) + t.teardown(() => store.close()) + const a = new Hyperdrive(store.session()) + t.teardown(() => a.close()) + + const num = 1_000 + for (let i = 0; i < num; i++) { + await a.put('./file' + i, 'here') + } + + const store2 = new Corestore(await t.tmp()) + t.teardown(() => store2.close()) + + const b = new Hyperdrive(store2.session(), a.key) + t.teardown(() => b.close()) + + const gotAppend = once(b.core, 'append') + replicateDebugStream(t, a, b, { latency: 10 }) + await gotAppend + + t.is(await b.getBlobsLength(), await a.getBlobsLength(), 'blob lengths match') + t.comment('wireRequest sent', b.core.replicator.stats.wireRequest.tx) + t.ok( + b.core.replicator.stats.wireRequest.tx < 1.1 * num, + 'synced within a reasonable amount of requests' + ) +}) diff --git a/test.js b/test/unit.js similarity index 61% rename from test.js rename to test/unit.js index 43b0229..8c0580c 100644 --- a/test.js +++ b/test/unit.js @@ -5,14 +5,17 @@ const test = require('brittle') const Corestore = require('corestore') const { discoveryKey } = require('hypercore-crypto') const { pipelinePromise: pipeline, Writable, Readable } = require('streamx') -const testnet = require('hyperdht/testnet') -const DHT = require('hyperdht') -const Hyperswarm = require('hyperswarm') const b4a = require('b4a') const getTmpDir = require('test-tmp') const unixPathResolve = require('unix-path-resolve') -const DebuggingStream = require('debugging-stream') -const Hyperdrive = require('./index.js') +const Hyperdrive = require('../index.js') +const { + localTestenv: testenv, + readdirator, + filter, + streamToBuffer, + sampleFile +} = require('./helpers.js') test('drive.core', async (t) => { const { drive } = await testenv(t) @@ -21,7 +24,7 @@ test('drive.core', async (t) => { test('drive.version', async (t) => { const { drive } = await testenv(t) - await drive.put(__filename, fs.readFileSync(__filename)) + await drive.put(sampleFile, fs.readFileSync(sampleFile)) t.is(drive.db.feed.length, drive.version) }) @@ -51,25 +54,12 @@ test('drive.supportsMetadata', async (t) => { t.is(true, drive.supportsMetadata) }) -test('Hyperdrive(corestore, key)', async (t) => { - t.plan(2) - const { corestore, drive } = await testenv(t) - const diskbuf = fs.readFileSync(__filename) - await drive.put(__filename, diskbuf) - const bndlbuf = await drive.get(__filename) - t.is(b4a.compare(diskbuf, bndlbuf), 0) - const mirror = new Hyperdrive(corestore.session({ writable: false }), drive.core.key) - await mirror.ready() - const mrrrbuf = await mirror.get(__filename) - t.is(b4a.compare(bndlbuf, mrrrbuf), 0) -}) - test('drive.put(path, buf) and drive.get(path)', async (t) => { { const { drive } = await testenv(t) - const diskbuf = fs.readFileSync(__filename) - await drive.put(__filename, diskbuf) - const bndlbuf = await drive.get(__filename) + const diskbuf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, diskbuf) + const bndlbuf = await drive.get(sampleFile) t.is(b4a.compare(diskbuf, bndlbuf), 0) } @@ -86,35 +76,14 @@ test('drive.put(path, buf) and drive.get(path)', async (t) => { } }) -test('drive.get(path, { wait: false }) throws if entry exists but not found', async (t) => { - const { drive, mirror } = await testenv(t) - - const otherDrive = mirror.drive - const s1 = drive.corestore.replicate(true) - const s2 = otherDrive.corestore.replicate(false) - s1.pipe(s2).pipe(s1) - - await drive.put('/file', 'content') - await ensureDbLength(otherDrive, drive.version) - - await otherDrive.entry('/file') // Ensure in bee - - await t.exception(() => otherDrive.get('/file', { wait: false }), /BLOCK_NOT_AVAILABLE/) - t.is( - b4a.toString(await otherDrive.get('/file')), - 'content', - 'sanity check: can actually get content' - ) -}) - test('drive.createWriteStream(path) and drive.createReadStream(path)', async (t) => { { const { drive } = await testenv(t) - const diskbuf = await fs.readFileSync(__filename) - await pipeline(fs.createReadStream(__filename), drive.createWriteStream(__filename)) + const diskbuf = await fs.readFileSync(sampleFile) + await pipeline(fs.createReadStream(sampleFile), drive.createWriteStream(sampleFile)) let bndlbuf = null await pipeline( - drive.createReadStream(__filename), + drive.createReadStream(sampleFile), new Writable({ write(data, cb) { if (bndlbuf) bndlbuf = b4a.concat(bndlbuf, data) @@ -165,25 +134,25 @@ test('drive.createReadStream() with start/end options', async (t) => { test('drive.del() deletes entry at path', async (t) => { t.plan(3) const { drive } = await testenv(t) - await drive.put(__filename, fs.readFileSync(__filename)) - let buf = await drive.get(__filename) + await drive.put(sampleFile, fs.readFileSync(sampleFile)) + let buf = await drive.get(sampleFile) t.ok(b4a.isBuffer(buf)) - await drive.del(__filename) - buf = await drive.get(__filename) + await drive.del(sampleFile) + buf = await drive.get(sampleFile) t.is(buf, null) - const entry = await drive.entry(__filename) + const entry = await drive.entry(sampleFile) t.is(entry, null) }) test('drive.symlink(from, to) updates the entry at to include a reference for ', async (t) => { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf) - await drive.symlink('pointer', __filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf) + await drive.symlink('pointer', sampleFile) const result = await drive.get('pointer') t.is(result, null) const entry = await drive.entry('pointer') - t.is(entry.value.linkname, __filename) + t.is(entry.value.linkname, sampleFile) t.is(b4a.compare(buf, await drive.get(entry.value.linkname)), 0) }) @@ -192,9 +161,9 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf) - const { value: entry } = await drive.entry(__filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, false) @@ -202,9 +171,9 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf, { executable: false }) - const { value: entry } = await drive.entry(__filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf, { executable: false }) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, false) @@ -212,9 +181,9 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf, { executable: true }) - const { value: entry } = await drive.entry(__filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf, { executable: true }) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, true) @@ -222,10 +191,10 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf, { executable: false }) - await drive.symlink(__filename, linkname) - const { value: entry } = await drive.entry(__filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf, { executable: false }) + await drive.symlink(sampleFile, linkname) + const { value: entry } = await drive.entry(sampleFile) t.is(entry.blob, null) t.is(entry.executable, false) t.is(entry.linkname, linkname) @@ -233,10 +202,10 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const buf = fs.readFileSync(__filename) - await drive.put(__filename, buf, { executable: true }) - await drive.symlink(__filename, linkname) - const { value: entry } = await drive.entry(__filename) + const buf = fs.readFileSync(sampleFile) + await drive.put(sampleFile, buf, { executable: true }) + await drive.symlink(sampleFile, linkname) + const { value: entry } = await drive.entry(sampleFile) t.is(entry.blob, null) t.is(entry.executable, false) t.is(entry.linkname, linkname) @@ -244,20 +213,20 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - await drive.symlink(linkname, __filename) + await drive.symlink(linkname, sampleFile) const { value: entry } = await drive.entry(linkname) t.is(entry.blob, null) t.is(entry.executable, false) - t.is(entry.linkname, __filename) + t.is(entry.linkname, sampleFile) } { const { drive } = await testenv(t) - const ws = drive.createWriteStream(__filename) - ws.write(fs.readFileSync(__filename)) + const ws = drive.createWriteStream(sampleFile) + ws.write(fs.readFileSync(sampleFile)) ws.end() await once(ws, 'finish') - const { value: entry } = await drive.entry(__filename) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, false) @@ -265,11 +234,11 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const ws = drive.createWriteStream(__filename, { executable: false }) - ws.write(fs.readFileSync(__filename)) + const ws = drive.createWriteStream(sampleFile, { executable: false }) + ws.write(fs.readFileSync(sampleFile)) ws.end() await once(ws, 'finish') - const { value: entry } = await drive.entry(__filename) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, false) @@ -277,11 +246,11 @@ test('drive.entry(path) gets entry at path', async (t) => { { const { drive } = await testenv(t) - const ws = drive.createWriteStream(__filename, { executable: true }) - ws.write(fs.readFileSync(__filename)) + const ws = drive.createWriteStream(sampleFile, { executable: true }) + ws.write(fs.readFileSync(sampleFile)) ws.end() await once(ws, 'finish') - const { value: entry } = await drive.entry(__filename) + const { value: entry } = await drive.entry(sampleFile) t.ok(entry.blob) t.is(entry.linkname, null) t.is(entry.executable, true) @@ -665,196 +634,6 @@ test('drive.checkout(len)', async (t) => { } }) -test('drive.download(folder, [options])', async (t) => { - t.plan(7) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - let count = 0 - let max = -Infinity - - await drive.put('/parent/child/grandchild1', nil) - await drive.put('/parent/child/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - const blobs = await mirror.drive.getBlobs() - - blobs.core.on('download', (offset) => { - count++ - if (max < offset) max = offset - }) - - const l = drive.blobs.core.length - - await drive.put('/parent/sibling/grandchild1', nil) - - t.is(count, 0) - const download = mirror.drive.download('/parent/child') - await download.done() - t.is(max, l - 1) - const _count = count - t.ok(await mirror.drive.get('/parent/child/grandchild1')) - t.is(_count, count) - t.ok(await mirror.drive.get('/parent/child/grandchild2')) - t.is(_count, count) - const entry = await mirror.drive.entry('/parent/sibling/grandchild1') - await blobs.get(entry.value.blob) - t.is(count, _count + 1) -}) - -test('drive.download(filename, [options])', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - await drive.put('/parent/grandchild1', nil) - await drive.put('/file', nil) - await drive.put('/parent/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - await mirror.drive.getBlobs() - const download = mirror.drive.download('/file') - await download.done() - - t.ok(await mirror.drive.get('/file', { wait: false })) - - try { - await mirror.drive.get('/file1', { wait: false }) - } catch { - t.pass('not downloaded') - } -}) - -test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file-a', Buffer.alloc(1024)) - await drive.put('/file-b', Buffer.alloc(1024)) - await drive.put('/file-c', Buffer.alloc(1024)) - - while (mirror.drive.version < drive.version) { - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - const blobCore = (await mirror.drive.getBlobs()).core - - const fileTelem = downloadShark(mirror.drive.core) - const blobTelem = downloadShark(blobCore) - - const download = await mirror.drive.downloadRange( - [ - { start: 1, end: 2 }, - { start: 2, end: 3 } - ], - [{ start: 0, end: 3 }] - ) - await download.done() - - t.is(fileTelem.count, 2) - t.is(blobTelem.count, 3) -}) - -test('drive.downloadDiff(version, folder, [options])', async (t) => { - const { drive, swarm, mirror, corestore } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - const version = drive.version - - await drive.put('/parent/child/0', nil) - await drive.put('/parent/sibling/0', nil) - await drive.put('/parent/child/1', nil) - - while (mirror.drive.version < drive.version) { - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - const blobCore = (await mirror.drive.getBlobs()).core - - const filestelem = downloadShark(mirror.drive.core) - const blobstelem = downloadShark(blobCore) - - const downloadDiff = await mirror.drive.downloadDiff(version, '/parent/child') - await downloadDiff.done() - - const filescount = filestelem.count - const blobscount = blobstelem.count - - await mirror.drive.get('/parent/child/1') - - t.is(filescount, filestelem.count) - t.is(blobscount, blobstelem.count) -}) - -test('drive.has(path)', async (t) => { - t.plan(8) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - await drive.put('/parent/child/grandchild1', nil) - await drive.put('/parent/child/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - t.absent(await mirror.drive.has('/parent/child/')) - t.absent(await mirror.drive.has('/parent/child/grandchild2')) - t.absent(await mirror.drive.has('/non-existent.txt'), 'returns false for non-existent files') - t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') - - await drive.put('/parent/sibling/grandchild1', nil) - await ensureDbLength(mirror.drive, drive.version) - - const downloadChild = mirror.drive.download('/parent/child/') - await downloadChild.done() - - t.ok(await mirror.drive.has('/parent/child/')) - t.absent(await mirror.drive.has('/parent/')) - - const downloadSibling = mirror.drive.download('/parent/sibling/') - await downloadSibling.done() - - t.ok(await mirror.drive.has('/parent/')) - t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) -}) - test('drive.batch() & drive.flush()', async (t) => { const { drive } = await testenv(t) @@ -956,40 +735,6 @@ test('drive.close() with openBlobsFromHeader waiting in the background', async ( t.ok(drive.corestore.closed) }) -test('drive.findingPeers()', async (t) => { - t.plan(2) - const { drive, corestore, swarm, mirror } = await testenv(t) - await drive.put('/a', b4a.from('a')) - - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - - const done = mirror.drive.findingPeers() - const updating = mirror.drive.update({ wait: true }) - try { - await Promise.all([waitForEvent(mirror.swarm, 'connection'), mirror.swarm.flush()]) - } finally { - done() - } - - t.ok(await updating) - t.alike(await mirror.drive.get('/a'), b4a.from('a')) -}) - -test('drive.mirror()', async (t) => { - const { drive: a } = await testenv(t) - const { drive: b } = await testenv(t) - - await a.put('/file.txt', 'hello world') - await a.mirror(b).done() - - t.alike(await b.get('/file.txt'), b4a.from('hello world')) -}) - test('blobs with writable drive', async (t) => { t.plan(4) @@ -1174,30 +919,6 @@ test('basic properties', async function (t) { await drive.close() }) -test('basic writable option', async function (t) { - t.plan(3) - - const store = new Corestore(await t.tmp()) - - const a = new Hyperdrive(store) - await a.put('/file-one', 'hi') - - const b = new Hyperdrive(store.session({ writable: false }), a.key) - await b.ready() - t.is(b.writable, false) - t.is(b.blobs.core.writable, false) - - try { - await b.put('/file-two', 'hi') - t.fail('Should have failed') - } catch (err) { - t.is(err.code, 'SESSION_NOT_WRITABLE') - } - - await a.close() - await b.close() -}) - test('readdir filenames with dashes', async function (t) { t.plan(2) @@ -1339,138 +1060,6 @@ test('non-existing follow entry', async function (t) { await drive.close() }) -test('drive.entry(key, { timeout })', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.entry('/file.txt', { timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - -test('drive.entry(key, { wait })', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.entry('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive.get(key, { timeout })', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - await ensureDbLength(mirror.drive, drive.version) - - const entry = await mirror.drive.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - -test('drive.get(key, { wait }) with entry but no blob', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - const mirrorCheckout = mirror.drive.checkout(2) - const entry = await mirrorCheckout.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - await mirrorCheckout.close() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive.get(key, { wait }) without entry', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive peek with get() and timeout', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await ensureDbLength(mirror.drive, drive.version) - - const entry = await mirror.drive.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - - try { - await mirror.drive.get('/file.txt', { start: 100, timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - test('non-compat making of cores', async (t) => { const corestore = new Corestore(await t.tmp()) const drive = new Hyperdrive(corestore, { compat: false }) @@ -1527,35 +1116,6 @@ test('getBlobsLength of empty drive', async (t) => { await corestore.close() }) -test('getBlobsLength large db - prefetch', async (t) => { - const store = new Corestore(await t.tmp()) - t.teardown(() => store.close()) - const a = new Hyperdrive(store.session()) - t.teardown(() => a.close()) - - const num = 1_000 - for (let i = 0; i < num; i++) { - await a.put('./file' + i, 'here') - } - - const store2 = new Corestore(await t.tmp()) - t.teardown(() => store2.close()) - - const b = new Hyperdrive(store2.session(), a.key) - t.teardown(() => b.close()) - - const gotAppend = once(b.core, 'append') - replicateDebugStream(t, a, b, { latency: 10 }) - await gotAppend - - t.is(await b.getBlobsLength(), await a.getBlobsLength(), 'blob lengths match') - t.comment('wireRequest sent', b.core.replicator.stats.wireRequest.tx) - t.ok( - b.core.replicator.stats.wireRequest.tx < 1.1 * num, - 'synced within a reasonable amount of requests' - ) -}) - test('truncate happy path', async (t) => { const corestore = new Corestore(await t.tmp()) const drive = new Hyperdrive(corestore.session()) @@ -1755,96 +1315,6 @@ test('drive.list ignore and unignore', async (t) => { t.alike(entries, expectedEntries) }) - -test('download can be destroyed', async (t) => { - t.plan(1) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) - - await ensureDbLength(mirror.drive, drive.version) - const blobs = await mirror.drive.getBlobs() - - const download = mirror.drive.download('/file') - await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') - download.destroy() - - // not needed, just for test timing - await download.close() - - t.ok(blobs.core.contiguousLength < blobs.core.length) -}) - -test('upload/download can be monitored', async (t) => { - t.plan(16) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const file = '/example.md' - const bytes = 1024 * 100 - const buffer = Buffer.alloc(bytes, '0') - await drive.put(file, buffer) - await ensureDbLength(mirror.drive, drive.version) - - const uploadMonitor = drive.monitor(file) - await uploadMonitor.ready() - t.is(uploadMonitor.name, file) - t.is(uploadMonitor.uploadStats.targetBytes, bytes) - t.ok(uploadMonitor.uploadStats.targetBlocks > 0) - - const downloadMonitor = mirror.drive.monitor(file) - await downloadMonitor.ready() - t.is(downloadMonitor.downloadStats.targetBytes, bytes) - t.ok(downloadMonitor.downloadStats.targetBlocks > 0) - - let uploadUpdates = 0 - let downloadUpdates = 0 - - function onUploadUpdate() { - uploadUpdates++ - } - - function onDownloadUpdate() { - downloadUpdates++ - } - - uploadMonitor.on('update', onUploadUpdate) - downloadMonitor.on('update', onDownloadUpdate) - - await mirror.drive.get(file) - - t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) - t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) - t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) - t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) - t.is(uploadMonitor.uploadStats.percentage, 100) - t.is(downloadMonitor.downloadStats.percentage, 100) - t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) - t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) - t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') - t.ok(downloadUpdates >= 2, 'download should emit multiple update events') - - uploadMonitor.removeListener('update', onUploadUpdate) - downloadMonitor.removeListener('update', onDownloadUpdate) - - await uploadMonitor.close() - await downloadMonitor.close() - t.pass('monitors closed') -}) - test('monitor is removed from the Set on close', async (t) => { const { drive } = await testenv(t) const monitor = drive.monitor('/example.md') @@ -1853,35 +1323,6 @@ test('monitor is removed from the Set on close', async (t) => { await monitor.close() t.is(drive.monitors.size, 0) }) - -test('monitor range download', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file-a', Buffer.alloc(1024)) - await drive.put('/file-b', Buffer.alloc(1024)) - await drive.put('/file-c', Buffer.alloc(1024)) - - await ensureDbLength(mirror.drive, drive.version) - - const monitor = mirror.drive.monitor('download-monitor') - await monitor.ready() - - const download = await mirror.drive.downloadRange([], [{ start: 0, end: 3 }]) - await download.done() - - t.is(monitor.downloadStats.peers, 1) - t.ok(monitor.downloadStats.speed > 0) - t.ok(monitor.downloadStats.blocks > 0) - t.ok(monitor.downloadStats.totalBytes, 3072) -}) - test('dedup mode', async (t) => { const { drive } = await testenv(t) @@ -1967,162 +1408,3 @@ test('write after close should not corrupt drive', async (t) => { await platformCorestore.close() }) - -async function testenv(t) { - const { teardown } = t - - const corestore = new Corestore(await t.tmp()) - await corestore.ready() - - const drive = new Hyperdrive(corestore) - await drive.ready() - teardown(drive.close.bind(drive)) - - const net = await testnet(2, { teardown }) - const { bootstrap } = net - const swarm = new Hyperswarm({ dht: new DHT({ bootstrap }) }) - teardown(swarm.destroy.bind(swarm)) - - const mirror = {} - mirror.swarm = new Hyperswarm({ dht: new DHT({ bootstrap }) }) - teardown(mirror.swarm.destroy.bind(mirror.swarm)) - mirror.corestore = new Corestore(await t.tmp()) - mirror.drive = new Hyperdrive(mirror.corestore, drive.key) - await mirror.drive.ready() - teardown(mirror.drive.close.bind(mirror.drive)) - - const tmp = await getTmpDir(t) - const root = __dirname - const paths = { tmp, root } - - return { net, paths, corestore, drive, swarm, mirror } -} - -async function* readdirator( - parent, - { - readdir = fs.readdirSync, - isDirectory = (x) => fs.statSync(x).isDirectory(), - filter = () => true - } = {} -) { - for await (const child of readdir(parent)) { - const next = path.join(parent, child) - try { - if (!filter(child)) continue - if (await isDirectory(next)) yield* readdirator(next) - else yield next - } catch { - continue - } - } -} - -function filter(x) { - return !/node_modules|\.git/.test(x) -} - -function downloadShark(core) { - const telem = { offsets: [], count: 0 } - core.on('download', (offset) => { - telem.count++ - telem.offsets.push(offset) - }) - return telem -} - -async function streamToBuffer(stream) { - const chunks = [] - for await (const chunk of stream) { - chunks.push(chunk) - } - return b4a.concat(chunks) -} - -async function replicate(drive, swarm, mirror) { - swarm.on('connection', (conn) => drive.corestore.replicate(conn)) - const discovery = swarm.join(drive.discoveryKey, { - server: true, - client: false - }) - await discovery.flushed() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() -} - -function replicateDebugStream(t, a, b, opts = {}) { - const { latency, speed, jitter } = opts - - const s1 = a.replicate(true, { keepAlive: false, ...opts }) - const s2Base = b.replicate(false, { keepAlive: false, ...opts }) - const s2 = new DebuggingStream(s2Base, { latency, speed, jitter }) - - s1.on('error', (err) => t.comment(`replication stream error (initiator): ${err}`)) - s2.on('error', (err) => t.comment(`replication stream error (responder): ${err}`)) - - if (opts.teardown !== false) { - t.teardown(async function () { - let missing = 2 - await new Promise((resolve) => { - s1.on('close', onclose) - s1.destroy() - - s2.on('close', onclose) - s2.destroy() - - function onclose() { - if (--missing === 0) resolve() - } - }) - }) - } - - s1.pipe(s2).pipe(s1) - - return [s1, s2] -} - -async function ensureDbLength(drive, length) { - while (drive.db.core.length < length) await once(drive.db.core, 'append') -} - -async function waitForAppendIfEmpty(core, message, timeout = 20000) { - if (core.length !== 0) return - await waitForEvent(core, 'append', () => core.length !== 0, timeout, message) -} - -async function waitForEvent( - emitter, - event, - predicate = null, - timeout = 20000, - message = `Timed out waiting for ${event}` -) { - await new Promise((resolve, reject) => { - let timer = null - - function cleanup() { - if (timer) clearTimeout(timer) - emitter.removeListener(event, onevent) - } - - function onevent(...args) { - if (predicate && !predicate(...args)) return - cleanup() - resolve() - } - - emitter.on(event, onevent) - timer = setTimeout(() => { - cleanup() - reject(new Error(message)) - }, timeout) - - if (predicate && predicate()) { - cleanup() - resolve() - } - }) -} From 4925203f0fa500f286523236c8d7a0b96cbf74c8 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Mon, 13 Apr 2026 11:19:03 -0400 Subject: [PATCH 2/4] refactoring --- test/e2e.js | 450 +------------------------------------------- test/helpers.js | 15 ++ test/integration.js | 398 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 414 insertions(+), 449 deletions(-) diff --git a/test/e2e.js b/test/e2e.js index 658e205..1e8ed4a 100644 --- a/test/e2e.js +++ b/test/e2e.js @@ -1,207 +1,10 @@ const test = require('brittle') const b4a = require('b4a') -const { - e2eTestenv: testenv, - replicate, - ensureDbLength, - downloadShark, - waitForAppendIfEmpty, - waitForEvent -} = require('./helpers.js') - -test('drive.download(folder, [options])', async (t) => { - t.plan(7) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - let count = 0 - let max = -Infinity - - await drive.put('/parent/child/grandchild1', nil) - await drive.put('/parent/child/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - const blobs = await mirror.drive.getBlobs() - - blobs.core.on('download', (offset) => { - count++ - if (max < offset) max = offset - }) - - const l = drive.blobs.core.length - - await drive.put('/parent/sibling/grandchild1', nil) - - t.is(count, 0) - const download = mirror.drive.download('/parent/child') - await download.done() - t.is(max, l - 1) - const _count = count - t.ok(await mirror.drive.get('/parent/child/grandchild1')) - t.is(_count, count) - t.ok(await mirror.drive.get('/parent/child/grandchild2')) - t.is(_count, count) - const entry = await mirror.drive.entry('/parent/sibling/grandchild1') - await blobs.get(entry.value.blob) - t.is(count, _count + 1) -}) - -test('drive.download(filename, [options])', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - await drive.put('/parent/grandchild1', nil) - await drive.put('/file', nil) - await drive.put('/parent/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - await mirror.drive.getBlobs() - const download = mirror.drive.download('/file') - await download.done() - - t.ok(await mirror.drive.get('/file', { wait: false })) - - try { - await mirror.drive.get('/file1', { wait: false }) - } catch { - t.pass('not downloaded') - } -}) - -test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file-a', Buffer.alloc(1024)) - await drive.put('/file-b', Buffer.alloc(1024)) - await drive.put('/file-c', Buffer.alloc(1024)) - - while (mirror.drive.version < drive.version) { - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - const blobCore = (await mirror.drive.getBlobs()).core - - const fileTelem = downloadShark(mirror.drive.core) - const blobTelem = downloadShark(blobCore) - - const download = await mirror.drive.downloadRange( - [ - { start: 1, end: 2 }, - { start: 2, end: 3 } - ], - [{ start: 0, end: 3 }] - ) - await download.done() - - t.is(fileTelem.count, 2) - t.is(blobTelem.count, 3) -}) - -test('drive.downloadDiff(version, folder, [options])', async (t) => { - const { drive, swarm, mirror, corestore } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - const version = drive.version - - await drive.put('/parent/child/0', nil) - await drive.put('/parent/sibling/0', nil) - await drive.put('/parent/child/1', nil) - - while (mirror.drive.version < drive.version) { - await new Promise((resolve) => setTimeout(resolve, 100)) - } - - const blobCore = (await mirror.drive.getBlobs()).core - - const filestelem = downloadShark(mirror.drive.core) - const blobstelem = downloadShark(blobCore) - - const downloadDiff = await mirror.drive.downloadDiff(version, '/parent/child') - await downloadDiff.done() - - const filescount = filestelem.count - const blobscount = blobstelem.count - - await mirror.drive.get('/parent/child/1') - - t.is(filescount, filestelem.count) - t.is(blobscount, blobstelem.count) -}) - -test('drive.has(path)', async (t) => { - t.plan(8) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const nil = b4a.from('nil') - - await drive.put('/parent/child/grandchild1', nil) - await drive.put('/parent/child/grandchild2', nil) - - await ensureDbLength(mirror.drive, drive.version) - - t.absent(await mirror.drive.has('/parent/child/')) - t.absent(await mirror.drive.has('/parent/child/grandchild2')) - t.absent(await mirror.drive.has('/non-existent.txt'), 'returns false for non-existent files') - t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') - - await drive.put('/parent/sibling/grandchild1', nil) - await ensureDbLength(mirror.drive, drive.version) - - const downloadChild = mirror.drive.download('/parent/child/') - await downloadChild.done() - - t.ok(await mirror.drive.has('/parent/child/')) - t.absent(await mirror.drive.has('/parent/')) - - const downloadSibling = mirror.drive.download('/parent/sibling/') - await downloadSibling.done() - - t.ok(await mirror.drive.has('/parent/')) - t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) -}) +const { e2eTestenv, waitForEvent } = require('./helpers.js') test('drive.findingPeers()', async (t) => { t.plan(2) - const { drive, corestore, swarm, mirror } = await testenv(t) + const { drive, corestore, swarm, mirror } = await e2eTestenv(t) await drive.put('/a', b4a.from('a')) swarm.on('connection', (conn) => corestore.replicate(conn)) @@ -222,252 +25,3 @@ test('drive.findingPeers()', async (t) => { t.ok(await updating) t.alike(await mirror.drive.get('/a'), b4a.from('a')) }) - -test('drive.entry(key, { timeout })', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.entry('/file.txt', { timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - -test('drive.entry(key, { wait })', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.entry('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive.get(key, { timeout })', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - await ensureDbLength(mirror.drive, drive.version) - - const entry = await mirror.drive.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - -test('drive.get(key, { wait }) with entry but no blob', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - const mirrorCheckout = mirror.drive.checkout(2) - const entry = await mirrorCheckout.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - await mirrorCheckout.close() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive.get(key, { wait }) without entry', async (t) => { - t.plan(1) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await mirror.drive.getBlobs() - - await swarm.destroy() - await drive.close() - - try { - await mirror.drive.get('/file.txt', { wait: false }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'BLOCK_NOT_AVAILABLE') - } -}) - -test('drive peek with get() and timeout', async (t) => { - t.plan(3) - - const { drive, swarm, mirror } = await testenv(t) - await replicate(drive, swarm, mirror) - - await drive.put('/file.txt', b4a.from('hi')) - await ensureDbLength(mirror.drive, drive.version) - - const entry = await mirror.drive.entry('/file.txt') - t.ok(entry) - t.ok(entry.value.blob) - - try { - await mirror.drive.get('/file.txt', { start: 100, timeout: 1 }) - t.fail('should have failed') - } catch (error) { - t.is(error.code, 'REQUEST_TIMEOUT') - } -}) - -test('download can be destroyed', async (t) => { - t.plan(1) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) - - await ensureDbLength(mirror.drive, drive.version) - const blobs = await mirror.drive.getBlobs() - - const download = mirror.drive.download('/file') - await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') - download.destroy() - - // not needed, just for test timing - await download.close() - - t.ok(blobs.core.contiguousLength < blobs.core.length) -}) - -test('upload/download can be monitored', async (t) => { - t.plan(16) - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const file = '/example.md' - const bytes = 1024 * 100 - const buffer = Buffer.alloc(bytes, '0') - await drive.put(file, buffer) - await ensureDbLength(mirror.drive, drive.version) - - const uploadMonitor = drive.monitor(file) - await uploadMonitor.ready() - t.is(uploadMonitor.name, file) - t.is(uploadMonitor.uploadStats.targetBytes, bytes) - t.ok(uploadMonitor.uploadStats.targetBlocks > 0) - - const downloadMonitor = mirror.drive.monitor(file) - await downloadMonitor.ready() - t.is(downloadMonitor.downloadStats.targetBytes, bytes) - t.ok(downloadMonitor.downloadStats.targetBlocks > 0) - - let uploadUpdates = 0 - let downloadUpdates = 0 - - function onUploadUpdate() { - uploadUpdates++ - } - - function onDownloadUpdate() { - downloadUpdates++ - } - - uploadMonitor.on('update', onUploadUpdate) - downloadMonitor.on('update', onDownloadUpdate) - - await mirror.drive.get(file) - - t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) - t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) - t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) - t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) - t.is(uploadMonitor.uploadStats.percentage, 100) - t.is(downloadMonitor.downloadStats.percentage, 100) - t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) - t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) - t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') - t.ok(downloadUpdates >= 2, 'download should emit multiple update events') - - uploadMonitor.removeListener('update', onUploadUpdate) - downloadMonitor.removeListener('update', onDownloadUpdate) - - await uploadMonitor.close() - await downloadMonitor.close() - t.pass('monitors closed') -}) - -test('monitor range download', async (t) => { - const { corestore, drive, swarm, mirror } = await testenv(t) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - await drive.put('/file-a', Buffer.alloc(1024)) - await drive.put('/file-b', Buffer.alloc(1024)) - await drive.put('/file-c', Buffer.alloc(1024)) - - await ensureDbLength(mirror.drive, drive.version) - - const monitor = mirror.drive.monitor('download-monitor') - await monitor.ready() - - const download = await mirror.drive.downloadRange([], [{ start: 0, end: 3 }]) - await download.done() - - t.is(monitor.downloadStats.peers, 1) - t.ok(monitor.downloadStats.speed > 0) - t.ok(monitor.downloadStats.blocks > 0) - t.ok(monitor.downloadStats.totalBytes, 3072) -}) diff --git a/test/helpers.js b/test/helpers.js index 11dd5ab..2777995 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -168,6 +168,19 @@ async function ensureDbLength(drive, length) { while (drive.db.core.length < length) await once(drive.db.core, 'append') } +function pipeReplicate(drive, mirror) { + const s1 = drive.corestore.replicate(true, { keepAlive: false }) + const s2 = mirror.corestore.replicate(false, { keepAlive: false }) + s1.pipe(s2).pipe(s1) + return [s1, s2] +} + +async function syncDriveVersion(mirrorDrive, targetVersion) { + while (mirrorDrive.version < targetVersion) { + await once(mirrorDrive.db.core, 'append') + } +} + async function waitForAppendIfEmpty(core, message, timeout = 20000) { if (core.length !== 0) return await waitForEvent(core, 'append', () => core.length !== 0, timeout, message) @@ -220,6 +233,8 @@ module.exports = { replicate, replicateDebugStream, ensureDbLength, + pipeReplicate, + syncDriveVersion, waitForAppendIfEmpty, waitForEvent, Hyperdrive diff --git a/test/integration.js b/test/integration.js index 77ce39e..a7764b0 100644 --- a/test/integration.js +++ b/test/integration.js @@ -8,7 +8,11 @@ const { localTestenv: testenv, localTestenvWithMirror: testenvWithMirror, replicateDebugStream, + pipeReplicate, + syncDriveVersion, ensureDbLength, + downloadShark, + waitForAppendIfEmpty, sampleFile } = require('./helpers.js') @@ -80,7 +84,7 @@ test('basic writable option', async function (t) { await b.close() }) -test('getBlobsLength large db - prefetch', async (t) => { +test('getBlobsLength large db - prefetch', { timeout: 120_000 }, async (t) => { const store = new Corestore(await t.tmp()) t.teardown(() => store.close()) const a = new Hyperdrive(store.session()) @@ -108,3 +112,395 @@ test('getBlobsLength large db - prefetch', async (t) => { 'synced within a reasonable amount of requests' ) }) + +test('drive.download(folder, [options])', async (t) => { + t.plan(7) + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + const nil = b4a.from('nil') + + let count = 0 + let max = -Infinity + + await drive.put('/parent/child/grandchild1', nil) + await drive.put('/parent/child/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + const blobs = await mirror.drive.getBlobs() + + blobs.core.on('download', (offset) => { + count++ + if (max < offset) max = offset + }) + + const l = drive.blobs.core.length + + await drive.put('/parent/sibling/grandchild1', nil) + + t.is(count, 0) + const download = mirror.drive.download('/parent/child') + await download.done() + t.is(max, l - 1) + const _count = count + t.ok(await mirror.drive.get('/parent/child/grandchild1')) + t.is(_count, count) + t.ok(await mirror.drive.get('/parent/child/grandchild2')) + t.is(_count, count) + const entry = await mirror.drive.entry('/parent/sibling/grandchild1') + await blobs.get(entry.value.blob) + t.is(count, _count + 1) +}) + +test('drive.download(filename, [options])', async (t) => { + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + const nil = b4a.from('nil') + + await drive.put('/parent/grandchild1', nil) + await drive.put('/file', nil) + await drive.put('/parent/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + await mirror.drive.getBlobs() + const download = mirror.drive.download('/file') + await download.done() + + t.ok(await mirror.drive.get('/file', { wait: false })) + + try { + await mirror.drive.get('/file1', { wait: false }) + } catch { + t.pass('not downloaded') + } +}) + +test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + await drive.put('/file-a', Buffer.alloc(1024)) + await drive.put('/file-b', Buffer.alloc(1024)) + await drive.put('/file-c', Buffer.alloc(1024)) + + await syncDriveVersion(mirror.drive, drive.version) + + const blobCore = (await mirror.drive.getBlobs()).core + + const fileTelem = downloadShark(mirror.drive.core) + const blobTelem = downloadShark(blobCore) + + const download = await mirror.drive.downloadRange( + [ + { start: 1, end: 2 }, + { start: 2, end: 3 } + ], + [{ start: 0, end: 3 }] + ) + await download.done() + + t.is(fileTelem.count, 2) + t.is(blobTelem.count, 3) +}) + +test('drive.downloadDiff(version, folder, [options])', async (t) => { + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + const nil = b4a.from('nil') + const version = drive.version + + await drive.put('/parent/child/0', nil) + await drive.put('/parent/sibling/0', nil) + await drive.put('/parent/child/1', nil) + + await syncDriveVersion(mirror.drive, drive.version) + + const blobCore = (await mirror.drive.getBlobs()).core + + const filestelem = downloadShark(mirror.drive.core) + const blobstelem = downloadShark(blobCore) + + const downloadDiff = await mirror.drive.downloadDiff(version, '/parent/child') + await downloadDiff.done() + + const filescount = filestelem.count + const blobscount = blobstelem.count + + await mirror.drive.get('/parent/child/1') + + t.is(filescount, filestelem.count) + t.is(blobscount, blobstelem.count) +}) + +test('drive.has(path)', async (t) => { + t.plan(8) + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + const nil = b4a.from('nil') + + await drive.put('/parent/child/grandchild1', nil) + await drive.put('/parent/child/grandchild2', nil) + + await ensureDbLength(mirror.drive, drive.version) + + t.absent(await mirror.drive.has('/parent/child/')) + t.absent(await mirror.drive.has('/parent/child/grandchild2')) + t.absent(await mirror.drive.has('/non-existent.txt'), 'returns false for non-existent files') + t.absent(await mirror.drive.has('/non-existent/'), 'returns false for non-existent directory') + + await drive.put('/parent/sibling/grandchild1', nil) + await ensureDbLength(mirror.drive, drive.version) + + const downloadChild = mirror.drive.download('/parent/child/') + await downloadChild.done() + + t.ok(await mirror.drive.has('/parent/child/')) + t.absent(await mirror.drive.has('/parent/')) + + const downloadSibling = mirror.drive.download('/parent/sibling/') + await downloadSibling.done() + + t.ok(await mirror.drive.has('/parent/')) + t.ok(await mirror.drive.has('/parent/sibling/grandchild1')) +}) + +test('drive.entry(key, { timeout })', async (t) => { + t.plan(1) + + const { drive, mirror } = await testenvWithMirror(t) + const [s1, s2] = pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + s1.destroy() + s2.destroy() + await drive.close() + + try { + await mirror.drive.entry('/file.txt', { timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('drive.entry(key, { wait })', async (t) => { + t.plan(1) + + const { drive, mirror } = await testenvWithMirror(t) + const [s1, s2] = pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + s1.destroy() + s2.destroy() + await drive.close() + + try { + await mirror.drive.entry('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive.get(key, { timeout })', async (t) => { + t.plan(3) + + const { drive, mirror } = await testenvWithMirror(t) + const [s1, s2] = pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + await ensureDbLength(mirror.drive, drive.version) + + const entry = await mirror.drive.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + + s1.destroy() + s2.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('drive.get(key, { wait }) with entry but no blob', async (t) => { + t.plan(3) + + const { drive, mirror } = await testenvWithMirror(t) + const [s1, s2] = pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + const mirrorCheckout = mirror.drive.checkout(2) + const entry = await mirrorCheckout.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + await mirrorCheckout.close() + + s1.destroy() + s2.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive.get(key, { wait }) without entry', async (t) => { + t.plan(1) + + const { drive, mirror } = await testenvWithMirror(t) + const [s1, s2] = pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await mirror.drive.getBlobs() + + s1.destroy() + s2.destroy() + await drive.close() + + try { + await mirror.drive.get('/file.txt', { wait: false }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'BLOCK_NOT_AVAILABLE') + } +}) + +test('drive peek with get() and timeout', async (t) => { + t.plan(3) + + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + await drive.put('/file.txt', b4a.from('hi')) + await ensureDbLength(mirror.drive, drive.version) + + const entry = await mirror.drive.entry('/file.txt') + t.ok(entry) + t.ok(entry.value.blob) + + try { + await mirror.drive.get('/file.txt', { start: 100, timeout: 1 }) + t.fail('should have failed') + } catch (error) { + t.is(error.code, 'REQUEST_TIMEOUT') + } +}) + +test('download can be destroyed', async (t) => { + t.plan(1) + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) + + await ensureDbLength(mirror.drive, drive.version) + const blobs = await mirror.drive.getBlobs() + + const download = mirror.drive.download('/file') + await waitForAppendIfEmpty(blobs.core, 'Timed out waiting for blobs length') + download.destroy() + + // not needed, just for test timing + await download.close() + + t.ok(blobs.core.contiguousLength < blobs.core.length) +}) + +test('upload/download can be monitored', async (t) => { + t.plan(16) + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + const file = '/example.md' + const bytes = 1024 * 100 + const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) + await ensureDbLength(mirror.drive, drive.version) + + const uploadMonitor = drive.monitor(file) + await uploadMonitor.ready() + t.is(uploadMonitor.name, file) + t.is(uploadMonitor.uploadStats.targetBytes, bytes) + t.ok(uploadMonitor.uploadStats.targetBlocks > 0) + + const downloadMonitor = mirror.drive.monitor(file) + await downloadMonitor.ready() + t.is(downloadMonitor.downloadStats.targetBytes, bytes) + t.ok(downloadMonitor.downloadStats.targetBlocks > 0) + + let uploadUpdates = 0 + let downloadUpdates = 0 + + function onUploadUpdate() { + uploadUpdates++ + } + + function onDownloadUpdate() { + downloadUpdates++ + } + + uploadMonitor.on('update', onUploadUpdate) + downloadMonitor.on('update', onDownloadUpdate) + + await mirror.drive.get(file) + + t.is(uploadMonitor.uploadStats.monitoringBytes, bytes) + t.is(downloadMonitor.downloadStats.monitoringBytes, bytes) + t.is(uploadMonitor.uploadStats.blocks, uploadMonitor.uploadStats.targetBlocks) + t.is(downloadMonitor.downloadStats.blocks, downloadMonitor.downloadStats.targetBlocks) + t.is(uploadMonitor.uploadStats.percentage, 100) + t.is(downloadMonitor.downloadStats.percentage, 100) + t.is(uploadMonitor.uploadSpeed(), uploadMonitor.uploadStats.speed) + t.is(downloadMonitor.downloadSpeed(), downloadMonitor.downloadStats.speed) + t.ok(uploadUpdates >= 2, 'upload should emit multiple update events') + t.ok(downloadUpdates >= 2, 'download should emit multiple update events') + + uploadMonitor.removeListener('update', onUploadUpdate) + downloadMonitor.removeListener('update', onDownloadUpdate) + + await uploadMonitor.close() + await downloadMonitor.close() + t.pass('monitors closed') +}) + +test('monitor range download', async (t) => { + const { drive, mirror } = await testenvWithMirror(t) + pipeReplicate(drive, mirror) + + await drive.put('/file-a', Buffer.alloc(1024)) + await drive.put('/file-b', Buffer.alloc(1024)) + await drive.put('/file-c', Buffer.alloc(1024)) + + await ensureDbLength(mirror.drive, drive.version) + + const monitor = mirror.drive.monitor('download-monitor') + await monitor.ready() + + const download = await mirror.drive.downloadRange([], [{ start: 0, end: 3 }]) + await download.done() + + t.is(monitor.downloadStats.peers, 1) + t.ok(monitor.downloadStats.speed > 0) + t.ok(monitor.downloadStats.blocks > 0) + t.is(monitor.downloadStats.totalBytes, 3072) +}) From 29e9a638a46a431c0c0c8646cd606c9a57bced98 Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Mon, 13 Apr 2026 11:32:43 -0400 Subject: [PATCH 3/4] more refactoring --- test/e2e.js | 11 +++-------- test/helpers.js | 8 ++------ test/integration.js | 4 +--- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/test/e2e.js b/test/e2e.js index 1e8ed4a..4d95161 100644 --- a/test/e2e.js +++ b/test/e2e.js @@ -1,18 +1,13 @@ const test = require('brittle') const b4a = require('b4a') -const { e2eTestenv, waitForEvent } = require('./helpers.js') +const { e2eTestenv, replicate, waitForEvent } = require('./helpers.js') test('drive.findingPeers()', async (t) => { t.plan(2) - const { drive, corestore, swarm, mirror } = await e2eTestenv(t) + const { drive, swarm, mirror } = await e2eTestenv(t) await drive.put('/a', b4a.from('a')) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await replicate(drive, swarm, mirror) const done = mirror.drive.findingPeers() const updating = mirror.drive.update({ wait: true }) diff --git a/test/helpers.js b/test/helpers.js index 2777995..566fa94 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -121,15 +121,11 @@ async function streamToBuffer(stream) { async function replicate(drive, swarm, mirror) { swarm.on('connection', (conn) => drive.corestore.replicate(conn)) - const discovery = swarm.join(drive.discoveryKey, { - server: true, - client: false - }) - await discovery.flushed() + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() } function replicateDebugStream(t, a, b, opts = {}) { diff --git a/test/integration.js b/test/integration.js index a7764b0..21cfd4a 100644 --- a/test/integration.js +++ b/test/integration.js @@ -33,9 +33,7 @@ test('drive.get(path, { wait: false }) throws if entry exists but not found', as const { drive, mirror } = await testenvWithMirror(t) const otherDrive = mirror.drive - const s1 = drive.corestore.replicate(true) - const s2 = otherDrive.corestore.replicate(false) - s1.pipe(s2).pipe(s1) + pipeReplicate(drive, otherDrive) await drive.put('/file', 'content') await ensureDbLength(otherDrive, drive.version) From 1bd054465616852cde8f7cccb815f1bfa4112a8c Mon Sep 17 00:00:00 2001 From: Daniel Christopher Date: Thu, 23 Apr 2026 09:41:47 -0400 Subject: [PATCH 4/4] refactors --- package.json | 10 ++++---- test/e2e.js | 6 ++--- test/helpers.js | 12 +++++----- test/{integration.js => replication.js} | 32 ++++++++++++------------- 4 files changed, 30 insertions(+), 30 deletions(-) rename test/{integration.js => replication.js} (95%) diff --git a/package.json b/package.json index c1a44c8..9a4610f 100644 --- a/package.json +++ b/package.json @@ -15,11 +15,11 @@ "format": "prettier --write .", "lint": "prettier --check . && lunte", "test": "npm run test:node && npm run test:bare", - "test:bare": "brittle-bare --coverage test/unit.js test/integration.js test/e2e.js", - "test:node": "brittle-node --coverage test/unit.js test/integration.js test/e2e.js", - "test:unit": "brittle-node test/unit.js", - "test:integration": "brittle-node test/integration.js", - "test:e2e": "brittle-node test/e2e.js" + "test:bare": "brittle-bare --coverage test/unit.js test/replication.js test/e2e.js", + "test:node": "brittle-node --coverage test/unit.js test/replication.js test/e2e.js", + "test:unit": "brittle-bare test/unit.js", + "test:replication": "brittle-bare test/replication.js", + "test:e2e": "brittle-bare test/e2e.js" }, "author": "Holepunch", "license": "Apache-2.0", diff --git a/test/e2e.js b/test/e2e.js index 4d95161..0516217 100644 --- a/test/e2e.js +++ b/test/e2e.js @@ -1,13 +1,13 @@ const test = require('brittle') const b4a = require('b4a') -const { e2eTestenv, replicate, waitForEvent } = require('./helpers.js') +const { e2eTestenv, swarm, waitForEvent } = require('./helpers.js') test('drive.findingPeers()', async (t) => { t.plan(2) - const { drive, swarm, mirror } = await e2eTestenv(t) + const { drive, hyperSwarm, mirror } = await e2eTestenv(t) await drive.put('/a', b4a.from('a')) - await replicate(drive, swarm, mirror) + await swarm(drive, hyperSwarm, mirror) const done = mirror.drive.findingPeers() const updating = mirror.drive.update({ wait: true }) diff --git a/test/helpers.js b/test/helpers.js index 566fa94..77e84cd 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -75,7 +75,7 @@ async function e2eTestenv(t) { const tmp = await getTmpDir(t) const paths = { tmp, root: pkgRoot } - return { net, paths, corestore, drive, swarm, mirror } + return { net, paths, corestore, drive, hyperSwarm: swarm, mirror } } async function* readdirator( @@ -99,7 +99,7 @@ async function* readdirator( } function filter(x) { - return !/node_modules|\.git/.test(x) + return !/^(?:node_modules|coverage|\.git)$/.test(x) } function downloadShark(core) { @@ -119,7 +119,7 @@ async function streamToBuffer(stream) { return b4a.concat(chunks) } -async function replicate(drive, swarm, mirror) { +async function swarm(drive, swarm, mirror) { swarm.on('connection', (conn) => drive.corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) await swarm.flush() @@ -164,7 +164,7 @@ async function ensureDbLength(drive, length) { while (drive.db.core.length < length) await once(drive.db.core, 'append') } -function pipeReplicate(drive, mirror) { +function replicate(drive, mirror) { const s1 = drive.corestore.replicate(true, { keepAlive: false }) const s2 = mirror.corestore.replicate(false, { keepAlive: false }) s1.pipe(s2).pipe(s1) @@ -226,10 +226,10 @@ module.exports = { filter, downloadShark, streamToBuffer, - replicate, + swarm, replicateDebugStream, ensureDbLength, - pipeReplicate, + replicate, syncDriveVersion, waitForAppendIfEmpty, waitForEvent, diff --git a/test/integration.js b/test/replication.js similarity index 95% rename from test/integration.js rename to test/replication.js index 21cfd4a..4d9a007 100644 --- a/test/integration.js +++ b/test/replication.js @@ -8,7 +8,7 @@ const { localTestenv: testenv, localTestenvWithMirror: testenvWithMirror, replicateDebugStream, - pipeReplicate, + replicate, syncDriveVersion, ensureDbLength, downloadShark, @@ -33,7 +33,7 @@ test('drive.get(path, { wait: false }) throws if entry exists but not found', as const { drive, mirror } = await testenvWithMirror(t) const otherDrive = mirror.drive - pipeReplicate(drive, otherDrive) + replicate(drive, otherDrive) await drive.put('/file', 'content') await ensureDbLength(otherDrive, drive.version) @@ -114,7 +114,7 @@ test('getBlobsLength large db - prefetch', { timeout: 120_000 }, async (t) => { test('drive.download(folder, [options])', async (t) => { t.plan(7) const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) const nil = b4a.from('nil') @@ -153,7 +153,7 @@ test('drive.download(folder, [options])', async (t) => { test('drive.download(filename, [options])', async (t) => { const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) const nil = b4a.from('nil') @@ -178,7 +178,7 @@ test('drive.download(filename, [options])', async (t) => { test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) await drive.put('/file-a', Buffer.alloc(1024)) await drive.put('/file-b', Buffer.alloc(1024)) @@ -206,7 +206,7 @@ test('drive.downloadRange(dbRanges, blobRanges)', async (t) => { test('drive.downloadDiff(version, folder, [options])', async (t) => { const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) const nil = b4a.from('nil') const version = drive.version @@ -237,7 +237,7 @@ test('drive.downloadDiff(version, folder, [options])', async (t) => { test('drive.has(path)', async (t) => { t.plan(8) const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) const nil = b4a.from('nil') @@ -271,7 +271,7 @@ test('drive.entry(key, { timeout })', async (t) => { t.plan(1) const { drive, mirror } = await testenvWithMirror(t) - const [s1, s2] = pipeReplicate(drive, mirror) + const [s1, s2] = replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await mirror.drive.getBlobs() @@ -292,7 +292,7 @@ test('drive.entry(key, { wait })', async (t) => { t.plan(1) const { drive, mirror } = await testenvWithMirror(t) - const [s1, s2] = pipeReplicate(drive, mirror) + const [s1, s2] = replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await mirror.drive.getBlobs() @@ -313,7 +313,7 @@ test('drive.get(key, { timeout })', async (t) => { t.plan(3) const { drive, mirror } = await testenvWithMirror(t) - const [s1, s2] = pipeReplicate(drive, mirror) + const [s1, s2] = replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await mirror.drive.getBlobs() @@ -339,7 +339,7 @@ test('drive.get(key, { wait }) with entry but no blob', async (t) => { t.plan(3) const { drive, mirror } = await testenvWithMirror(t) - const [s1, s2] = pipeReplicate(drive, mirror) + const [s1, s2] = replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await mirror.drive.getBlobs() @@ -366,7 +366,7 @@ test('drive.get(key, { wait }) without entry', async (t) => { t.plan(1) const { drive, mirror } = await testenvWithMirror(t) - const [s1, s2] = pipeReplicate(drive, mirror) + const [s1, s2] = replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await mirror.drive.getBlobs() @@ -387,7 +387,7 @@ test('drive peek with get() and timeout', async (t) => { t.plan(3) const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) await drive.put('/file.txt', b4a.from('hi')) await ensureDbLength(mirror.drive, drive.version) @@ -407,7 +407,7 @@ test('drive peek with get() and timeout', async (t) => { test('download can be destroyed', async (t) => { t.plan(1) const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) await drive.put('/file', b4a.allocUnsafe(1024 * 1024 * 30)) @@ -427,7 +427,7 @@ test('download can be destroyed', async (t) => { test('upload/download can be monitored', async (t) => { t.plan(16) const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) const file = '/example.md' const bytes = 1024 * 100 @@ -483,7 +483,7 @@ test('upload/download can be monitored', async (t) => { test('monitor range download', async (t) => { const { drive, mirror } = await testenvWithMirror(t) - pipeReplicate(drive, mirror) + replicate(drive, mirror) await drive.put('/file-a', Buffer.alloc(1024)) await drive.put('/file-b', Buffer.alloc(1024))