diff --git a/README.md b/README.md index abcfd14ee..c9c98cede 100644 --- a/README.md +++ b/README.md @@ -490,6 +490,24 @@ Returns `null` if committing failed. Same as [`core.session(options)`](#const-session--coresessionoptions), but backed by a storage snapshot so will not truncate nor append. +#### `const readBatch = core.read()` + +Create a read batch. + +Reads made on the batch shall be condensed into a single read transaction on the underlying storage. + +Any blocks that are not in storage shall be fetched via replication, unless otherwise specified. + +#### `const promise = readBatch.get(index, opts)` + +Request a block from the read batch. `promise` shall resolve with the block or `null` after the batch is flushed. + +`opts` are identical to the `get` API. + +#### `readBatch.tryFlush()` + +Flush the read batch, any gets made on the batch will only resolve _after_ calling `tryFlush`. + #### `const info = await core.info([options])` Get information about this core, such as its total size in bytes. diff --git a/index.js b/index.js index 7ab24f8a6..86fd156c2 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,7 @@ const Info = require('./lib/info') const Download = require('./lib/download') const DefaultEncryption = require('./lib/default-encryption') const caps = require('./lib/caps') +const ReadBatch = require('./lib/read-batch') const Replicator = require('./lib/replicator') const { manifestHash, createManifest } = require('./lib/verifier') const { ReadStream, WriteStream, ByteStream } = require('./lib/streams') @@ -87,6 +88,8 @@ class Hypercore extends EventEmitter { this._findingPeers = 0 this._active = opts.weak ? !!opts.active : opts.active !== false + this._readBatches = [] + this._sessionIndex = -1 this._stateIndex = -1 // maintained by session state this._monitorIndex = -1 // maintained by replication state @@ -449,6 +452,10 @@ class Hypercore extends EventEmitter { if (this.closed === true) return + for (const batch of this._readBatches.slice()) { + batch.destroy() + } + this.core.removeMonitor(this) this.state.removeSession(this) this._removeSession() @@ -755,6 +762,20 @@ class Hypercore extends EventEmitter { return count === end - start } + read() { + const read = new ReadBatch(this) + read.index = this._readBatches.push(read) - 1 + + return read + } + + _removeReadBatch(batch) { + const last = this._readBatches.pop() + if (last === batch) return + this._readBatches[batch.index] = last + last.index = batch.index + } + async get(index, opts) { if (this.opened === false) await this.opening if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey) @@ -762,18 +783,23 @@ class Hypercore extends EventEmitter { if (this.closing !== null) throw SESSION_CLOSED('cannot get on a closed session', this.discoveryKey) - const encoding = - (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding - if (this.onseq !== null) this.onseq(index, this) const req = this._get(index, opts) let block = await req + + return this._handleBlock(index, block, opts) + } + + async _handleBlock(index, block, opts) { if (!block) return null if (opts && opts.raw) return block + const encoding = + (opts && opts.valueEncoding && c.from(opts.valueEncoding)) || this.valueEncoding + if (this.encryption && (!opts || opts.decrypt !== false)) { // Copy the block as it might be shared with other sessions. block = b4a.from(block) diff --git a/lib/read-batch.js b/lib/read-batch.js new file mode 100644 index 000000000..839109b47 --- /dev/null +++ b/lib/read-batch.js @@ -0,0 +1,50 @@ +const { ASSERTION } = require('hypercore-errors') + +module.exports = class ReadBatch { + constructor(core) { + this.core = core + this.rx = core.state.storage.read() + + this.reads = new Map() + this.index = -1 + + this.destroyed = false + } + + async destroy() { + this.core._removeReadBatch(this) + this.destroyed = true + this.rx.destroy() + } + + async get(index, opts = {}) { + if (!isValidIndex(index)) throw ASSERTION('block index is invalid', this.discoveryKey) + + if (this.core.onseq !== null) this.core.onseq(index, this.core) + + const block = await this._get(index) + if (this.destroyed) return null + + if (block) return this.core._handleBlock(index, block, opts) + + return this.core.get(index, opts) + } + + _get(index) { + if (this.reads.has(index)) return this.reads.get(index) + + const promise = this.rx.getBlock(index) + this.reads.set(index, promise) + + return promise + } + + tryFlush() { + this.rx.tryFlush() + this.core._removeReadBatch(this) + } +} + +function isValidIndex(index) { + return index === 0 || index > 0 +} diff --git a/test/read-batch.js b/test/read-batch.js new file mode 100644 index 000000000..fd06f0288 --- /dev/null +++ b/test/read-batch.js @@ -0,0 +1,96 @@ +const test = require('brittle') +const b4a = require('b4a') +const HypercoreStorage = require('hypercore-storage') +const crypto = require('hypercore-crypto') + +const Hypercore = require('../') +const { create, replicate, eventFlush } = require('./helpers') + +test('basic', async function (t) { + const core = await create(t) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + const read = core.read() + + const b0 = read.get(0) + const b1 = read.get(1) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, b4a.from('world')) +}) + +test('replication', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + const read = other.read() + + const b0 = read.get(0) + const b1 = read.get(1, { wait: false }) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, null) +}) + +test('mixed replication', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + t.alike(await other.get(0), b4a.from('hello')) + + const read = other.read() + + const b0 = read.get(0) + const b1 = read.get(1) + + read.tryFlush() + + t.alike(await b0, b4a.from('hello')) + t.alike(await b1, b4a.from('world')) +}) + +test('destroy', async function (t) { + const core = await create(t) + const other = await create(t, core.key) + + await core.append('hello') + await core.append('world') + + t.is(core.length, 2) + + replicate(core, other, t) + + t.alike(await other.get(0), b4a.from('hello')) + + const read = other.read() + + const exception = t.exception(read.get(0), /Batch is destroyed/) + + read.destroy() + + await exception + + await t.execution(other.close()) +})