From 2659624a3aa26ff93fa015cfcadf0b6fe925729f Mon Sep 17 00:00:00 2001 From: Christophe Diederichs Date: Thu, 9 Oct 2025 16:13:21 +0100 Subject: [PATCH 1/3] add read batches --- index.js | 16 +++++++++-- lib/read-batch.js | 39 +++++++++++++++++++++++++ test/read-batch.js | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 lib/read-batch.js create mode 100644 test/read-batch.js diff --git a/index.js b/index.js index 7ab24f8a6..23efa587b 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,7 @@ const Info = require('./lib/info') const Download = require('./lib/download') const DefaultEncryption = require('./lib/default-encryption') const caps = require('./lib/caps') +const ReadBatch = require('./lib/read-batch') const Replicator = require('./lib/replicator') const { manifestHash, createManifest } = require('./lib/verifier') const { ReadStream, WriteStream, ByteStream } = require('./lib/streams') @@ -755,6 +756,10 @@ class Hypercore extends EventEmitter { return count === end - start } + read() { + return new ReadBatch(this) + } + async get(index, opts) { if (this.opened === false) await this.opening if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey) @@ -762,18 +767,23 @@ class Hypercore extends EventEmitter { if (this.closing !== null) throw SESSION_CLOSED('cannot get on a closed session', this.discoveryKey) - const encoding = - (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding - if (this.onseq !== null) this.onseq(index, this) const req = this._get(index, opts) let block = await req + + return this._handleBlock(index, block, opts) + } + + async _handleBlock(index, block, opts) { if (!block) return null if (opts && opts.raw) return block + const encoding = + (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding + if (this.encryption && (!opts || opts.decrypt !== false)) { // Copy the block as it might be shared with other sessions. block = b4a.from(block) diff --git a/lib/read-batch.js b/lib/read-batch.js new file mode 100644 index 000000000..9f1ebc4a9 --- /dev/null +++ b/lib/read-batch.js @@ -0,0 +1,39 @@ +const { ASSERTION } = require('hypercore-errors') + +module.exports = class ReadBatch { + constructor(core) { + this.core = core + this.rx = core.state.storage.read() + + this.reads = new Map() + } + + async get(index, opts = {}) { + if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey) + + if (this.core.onseq !== null) this.core.onseq(index, this.core) + + const block = await this._get(index) + + if (block) return this.core._handleBlock(index, block, opts) + + return this.core.get(index, opts) + } + + _get(index) { + if (this.reads.has(index)) return this.reads.get(index) + + const promise = this.rx.getBlock(index) + this.reads.set(index, promise) + + return promise + } + + tryFlush() { + this.rx.tryFlush() + } +} + +function isValidIndex(index) { + return index === 0 || index > 0 +} diff --git a/test/read-batch.js b/test/read-batch.js new file mode 100644 index 000000000..e3c1f3595 --- /dev/null +++ b/test/read-batch.js @@ -0,0 +1,72 @@ +const test = require('brittle') +const b4a = require('b4a') +const HypercoreStorage = require('hypercore-storage') +const crypto = require('hypercore-crypto') + +const Hypercore = require('../') +const { create, replicate, eventFlush } = require('./helpers') + +test('basic', async function (t) { + const core = await create(t) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + const read = core.read() + + const b0 = read.get(0) + const b1 = read.get(1) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, b4a.from('world')) +}) + +test('replication', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + const read = other.read() + + const b0 = read.get(0) + const b1 = read.get(1, { wait: false }) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, null) +}) + +test('mixed', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + t.alike(await other.get(0), b4a.from('hello')) + + const read = other.read() + + const b0 = read.get(0) + const b1 = read.get(1) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, b4a.from('world')) +}) From fefee7eb4a9e7449d787c20a13796c316d8e4d52 Mon Sep 17 00:00:00 2001 From: Christophe Diederichs Date: Thu, 9 Oct 2025 16:36:14 +0100 Subject: [PATCH 2/3] add destroy api --- index.js | 18 +++++++++++++++++- lib/read-batch.js | 11 +++++++++++ test/read-batch.js | 26 +++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 23efa587b..86fd156c2 100644 --- a/index.js +++ b/index.js @@ -88,6 +88,8 @@ class Hypercore extends EventEmitter { this._findingPeers = 0 this._active = opts.weak ? !!opts.active : opts.active !== false + this._readBatches = [] + this._sessionIndex = -1 this._stateIndex = -1 // maintained by session state this._monitorIndex = -1 // maintained by replication state @@ -450,6 +452,10 @@ class Hypercore extends EventEmitter { if (this.closed === true) return + for (const batch of this._readBatches.slice()) { + batch.destroy() + } + this.core.removeMonitor(this) this.state.removeSession(this) this._removeSession() @@ -757,7 +763,17 @@ class Hypercore extends EventEmitter { } read() { - return new ReadBatch(this) + const read = new ReadBatch(this) + read.index = this._readBatches.push(read) - 1 + + return read + } + + _removeReadBatch(batch) { + const last = this._readBatches.pop() + if (last === batch) return + this._readBatches[batch.index] = last + last.index = batch.index } async get(index, opts) { diff --git a/lib/read-batch.js b/lib/read-batch.js index 9f1ebc4a9..839109b47 100644 --- a/lib/read-batch.js +++ b/lib/read-batch.js @@ -6,6 +6,15 @@ module.exports = class ReadBatch { this.rx = core.state.storage.read() this.reads = new Map() + this.index = -1 + + this.destroyed = false + } + + async destroy() { + this.core._removeReadBatch(this) + this.destroyed = true + this.rx.destroy() } async get(index, opts = {}) { @@ -14,6 +23,7 @@ module.exports = class ReadBatch { if (this.core.onseq !== null) this.core.onseq(index, this.core) const block = await this._get(index) + if (this.destroyed) return null if (block) return this.core._handleBlock(index, block, opts) @@ -31,6 +41,7 @@ module.exports = class ReadBatch { tryFlush() { this.rx.tryFlush() + this.core._removeReadBatch(this) } } diff --git a/test/read-batch.js b/test/read-batch.js index e3c1f3595..fd06f0288 100644 --- a/test/read-batch.js +++ b/test/read-batch.js @@ -47,7 +47,7 @@ test('replication', async function (t) { t.alike(await b1, null) }) -test('mixed', async function (t) { +test('mixed replication', async function (t) { const core = await create(t) const other = await create(t, core.key) @@ -70,3 +70,27 @@ test('mixed', async function (t) { t.alike(await b0, b4a.from('hello')) t.alike(await b1, b4a.from('world')) }) + +test('destroy', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + t.alike(await other.get(0), b4a.from('hello')) + + const read = other.read() + + const exception = t.exception(read.get(0), /Batch is destroyed/) + + read.destroy() + + await exception + + await t.execution(other.close()) +}) From 10fb0feb7c639a767612c1e2337444efb2aaa69f Mon Sep 17 00:00:00 2001 From: Christophe Diederichs Date: Thu, 9 Oct 2025 16:43:46 +0100 Subject: [PATCH 3/3] update readme --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index abcfd14ee..c9c98cede 100644 --- a/README.md +++ b/README.md @@ -490,6 +490,24 @@ Returns `null` if committing failed. Same as [`core.session(options)`](#const-session--coresessionoptions), but backed by a storage snapshot so will not truncate nor append. +#### `const readBatch = core.read()` + +Create a read batch. + +Reads made on the batch shall be condensed into a single read transaction on the underlying storage. + +Any blocks that are not in storage shall be fetched via replication, unless otherwise specified. + +#### `const promise = readBatch.get(index, opts)` + +Request a block from the read batch. `promise` shall resolve with the block or `null` after the batch is flushed. + +`opts` are identical to the `get` API. + +#### `readBatch.tryFlush()` + +Flush the read batch, any gets made on the batch will only resolve _after_ calling `tryFlush`. + #### `const info = await core.info([options])` Get information about this core, such as its total size in bytes.