diff --git a/README.md b/README.md index b539edd96..1ed0182fe 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ console.log('core was updated?', updated, 'length is', core.length) ``` js { wait: false, + length: 0 // minimum length (will hang until core.length is greater than or equal) activeRequests: undefined, // Advanced option. Pass requests for replicating blocks force: false, // Force an update even if core is writable. } diff --git a/index.js b/index.js index e8027992f..336cd6d63 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,7 @@ const Protomux = require('protomux') const id = require('hypercore-id-encoding') const safetyCatch = require('safety-catch') const unslab = require('unslab') +const rrp = require('resolve-reject-promise') const Core = require('./lib/core') const Info = require('./lib/info') @@ -681,6 +682,7 @@ class Hypercore extends EventEmitter { if (this.writable && (!opts || opts.force !== true)) return false + const minLength = opts?.length || 0 const remoteWait = this._shouldWait(opts, this.core.replicator.findingPeers > 0) let upgraded = false @@ -701,6 +703,25 @@ class Hypercore extends EventEmitter { } } + if (this.length < minLength) { + const { promise, resolve, reject } = rrp() + const onclose = () => { + reject(SESSION_CLOSED('Hypercore closed while waiting for length update')) + } + const onappend = () => { + if (this.length >= minLength) resolve() + } + this.on('close', onclose) + this.on('append', onappend) + try { + await promise + } finally { + this.off('close', onclose) + this.off('append', onappend) + } + upgraded = true + } + if (!upgraded) return false return true } diff --git a/package.json b/package.json index 0e9439307..1e1be6dd1 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,7 @@ "protomux": "^3.5.0", "quickbit-universal": "^2.2.0", "random-array-iterator": "^1.0.0", + "resolve-reject-promise": "^1.1.0", "safety-catch": "^1.0.1", "sodium-universal": "^5.0.1", "streamx": "^2.12.4", diff --git a/test/replicate.js b/test/replicate.js index cd69959e2..75ce32cf2 100644 --- a/test/replicate.js +++ b/test/replicate.js @@ -525,6 +525,50 @@ test('update with zero length', async function (t) { t.is(b.length, 0) }) +test('update with min length', async function (t) { + t.plan(2) + const a = await create(t) + const b = await create(t, a.key) + + replicate(a, b, t) + + let updateResolved = false + b.update({ length: 3 }) + .then(() => { + updateResolved = true + t.is(b.length, 3, 'update resolved at length 3') + }) + .catch((e) => { + console.error(e) + t.fail('unexpected error') + }) + + await a.append('block0') + await a.append('block1') + t.is(updateResolved, false, 'sanity check') + await a.append('block2') +}) + +test('update with min length throws if the core closes before reaching the length', async function (t) { + t.plan(1) + const a = await create(t) + const b = await create(t, a.key) + + replicate(a, b, t) + + b.update({ length: 3 }) + .then(() => { + t.fail('should not trigger') + }) + .catch((e) => { + t.is(e.code, 'SESSION_CLOSED') + }) + + await a.append('block0') + await a.append('block1') + await b.close() +}) + test('basic multiplexing', async function (t) { const a1 = await create(t) const a2 = await create(t)