diff --git a/index.js b/index.js index 4dd5b9f..e957f4f 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,9 @@ const mutexify = require('mutexify') const b4a = require('b4a') -const { BlobReadStream, BlobWriteStream } = require('./lib/streams') +const { BlockMapReadStream, BlobReadStream, BlobWriteStream } = require('./lib/streams') const Monitor = require('./lib/monitor') +const blockMap = require('./lib/block-map') const DEFAULT_BLOCK_SIZE = 2 ** 16 @@ -153,9 +154,7 @@ class Hyperblobs { } async get(id, opts) { - const all = - !opts || (!opts.start && opts.length === undefined && opts.end === undefined && !opts.core) - if (all) return this._getAll(id, opts) + if (isAll(id, opts)) return this._getAll(id, opts) const res = [] try { @@ -172,12 +171,18 @@ class Hyperblobs { } async clear(id, opts) { + if (id.blockMap) { + const map = await blockMap.get(this.core, id, { wait: false }) + if (map) { + for (const b of map.blocks) await this.core.clear(b.index, b.index + 1, opts) + } + } return this.core.clear(id.blockOffset, id.blockOffset + id.blockLength, opts) } createReadStream(id, opts) { const core = opts && opts.core ? opts.core : this.core - return new BlobReadStream(core, id, opts) + return id.blockMap ? new BlockMapReadStream(core, id, opts) : new BlobReadStream(core, id, opts) } createWriteStream(opts) { @@ -225,3 +230,12 @@ class Hyperblobs { } module.exports = Hyperblobs + +function isAll(id, opts) { + if (id.blockMap) return false + if (!opts) return true + if (opts.start) return false + if (opts.length !== undefined || opts.end !== undefined) return false + if (opts.core) return false + return true +} diff --git a/lib/block-map.js b/lib/block-map.js new file mode 100644 index 0000000..274b7d4 --- /dev/null +++ b/lib/block-map.js @@ -0,0 +1,157 @@ +const c = require('compact-encoding') +const b4a = require('b4a') +const crypto = require('hypercore-crypto') + +const block = { + preencode(state, m) { + c.uint.preencode(state, m.index) + c.uint.preencode(state, m.byteLength) + }, + encode(state, m) { + c.uint.encode(state, m.index) + c.uint.encode(state, m.byteLength) + }, + decode(state) { + return { + index: c.uint.decode(state), + byteLength: c.uint.decode(state) + } + } +} + +const list = c.array(block) + +const map = { + preencode(state, m) { + c.uint.preencode(state, 0) + list.preencode(state, m.blocks) + }, + encode(state, m) { + c.uint.encode(state, 0) + list.encode(state, m.blocks) + }, + decode(state) { + const version = c.uint.decode(state) + if (version > 0) throw new Error('Unsupported block map version') + + return { + version, + blocks: list.decode(state) + } + } +} + +exports.hash = hashId + +function hashId(block) { + return b4a.toString(crypto.hash(block), 'hex') +} + +exports.get = getBlockMap + +async function inferBlockMap(core, id, opts = {}) { + const hashes = !!opts.hashes + + const map = { + hashes: hashes ? new Map() : null, + blocks: [] + } + + for (let i = id.blockOffset; i < id.blockOffset + id.blockLength; i++) { + const block = await core.get(i) + const entry = { index: i, byteLength: block.byteLength } + map.blocks.push(entry) + if (hashes) map.hashes.set(hashId(block), entry) + } + + return map +} + +async function getBlockMap(core, id, opts = {}) { + if (!id.blockMap) return inferBlockMap(core, id, opts) + + if (id.blockLength > 64) { + throw new Error('Block map is too large') + } + + const hashes = !!opts.hashes + + const map = { + hashes: hashes ? new Map() : null, + blocks: null + } + + const promises = [] + + for (let i = id.blockOffset; i < id.blockOffset + id.blockLength; i++) { + promises.push(core.get(i)) + } + + const buffers = await Promise.all(promises) + const m = decodeBlockMap(buffers) + if (!m) return null + + map.blocks = m.blocks + + if (hashes && !core.writable) { + const blocks = [] + for (let i = 0; i < map.blocks.length; i++) blocks.push(map.blocks[i].index) + core.download({ blocks }) + } + + if (hashes) { + for (let i = 0; i < map.blocks.length; i++) { + const b = map.blocks[i] + const block = await core.get(b.index) + if (block === null) return null + map.hashes.set(hashId(block), b) + } + } + + return map +} + +exports.encode = encodeBlockMap + +function encodeBlockMap(header) { + const result = [] + + for (let i = 0; i < header.blocks.length; i += 8192) { + const blocks = + i === 0 && header.blocks.length < 8192 ? header.blocks : header.blocks.slice(i, i + 8192) + + const state = { start: 0, end: 0, buffer: null } + const m = { version: 0, blocks } + + map.preencode(state, m) + state.buffer = b4a.allocUnsafe(state.end) + map.encode(state, m) + + result.push(state.buffer) + } + + return result +} + +exports.decode = decodeBlockMap + +function decodeBlockMap(buffers) { + const result = { + version: 0, + blocks: null + } + + for (let i = 0; i < buffers.length; i++) { + if (!buffers[i]) return null + + const state = { start: 0, end: buffers[i].byteLength, buffer: buffers[i] } + const r = map.decode(state) + + result.version = r.version + + if (result.blocks) result.blocks.push(...r.blocks) + else result.blocks = r.blocks + } + + return result +} diff --git a/lib/streams.js b/lib/streams.js index 812148d..83c1cc1 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -1,38 +1,64 @@ const { Readable, Writable } = require('streamx') const { BLOCK_NOT_AVAILABLE } = require('hypercore-errors') const Prefetcher = require('./prefetcher') +const blockMap = require('./block-map') class BlobWriteStream extends Writable { - constructor(core, lock, opts) { + constructor(core, lock, opts = {}) { super(opts) - this.id = {} + this.id = { blockOffset: 0, byteOffset: 0, blockLength: 0, byteLength: 0 } this.core = core + + this._dedup = !!opts.dedup + this._blob = opts.blob || null + this._hashes = null + this._addBlockMap = !!opts.blockMap || this._dedup + this._blockMap = this._addBlockMap ? { version: 0, blocks: [] } : null this._lock = lock this._release = null this._batch = [] + + if (this._addBlockMap) this.id.blockMap = true + } + + async _openp() { + await this.core.ready() + const release = await new Promise((resolve) => this._lock(resolve)) + + this._release = release + this.id.byteOffset = this.core.byteLength + this.id.blockOffset = this.core.length + + if (!this._dedup) return + + if (this._blob) { + const map = await blockMap.get(this.core, this._blob, { hashes: true }) + this._hashes = map.hashes + } else { + this._hashes = new Map() + } } _open(cb) { - this.core.ready().then( - () => { - this._lock((release) => { - this._release = release - this.id.byteOffset = this.core.byteLength - this.id.blockOffset = this.core.length - return cb(null) - }) - }, - (err) => cb(err) - ) + this._openp().then(cb, cb) + } + + async _finalp() { + await this._append() + + if (this._blockMap) { + const buffers = await blockMap.encode(this._blockMap) + this.id.blockOffset = this.core.length + this.id.byteOffset = this.core.byteLength + await this.core.append(buffers) + } + + this.id.blockLength = this.core.length - this.id.blockOffset + this.id.byteLength = this.core.byteLength - this.id.byteOffset } _final(cb) { - this._append((err) => { - if (err) return cb(err) - this.id.blockLength = this.core.length - this.id.blockOffset - this.id.byteLength = this.core.byteLength - this.id.byteOffset - return cb(null) - }) + this._finalp().then(cb, cb) } _destroy(cb) { @@ -40,24 +66,153 @@ class BlobWriteStream extends Writable { cb(null) } - _append(cb) { - if (!this._batch.length) return cb(null) - return this.core.append(this._batch).then( - () => { - this._batch = [] - return cb(null) - }, - (err) => { - this._batch = [] - return cb(err) - } - ) + async _append() { + if (!this._batch.length) return + + const batch = this._batch + this._batch = [] + + await this.core.append(batch) } _write(data, cb) { + let dup = false + + if (this._blockMap) { + let entry = { + index: this.core.length + this._batch.length, + byteLength: data.byteLength + } + + if (this._hashes) { + const id = blockMap.hash(data) + const existing = this._hashes.get(id) + + if (existing) { + entry = existing + dup = true + } else { + this._hashes.set(id, entry) + } + } + + this._blockMap.blocks.push(entry) + } + + if (dup) return cb() + this._batch.push(data) - if (this._batch.length >= 16) return this._append(cb) - return cb(null) + + if (this._batch.length >= 16) { + this._append().then(cb, cb) + return + } + + return cb() + } +} + +class BlockMapReadStream extends Readable { + constructor(core, id, opts = {}) { + super(opts) + this.id = id + this.core = core.session({ wait: opts.wait, timeout: opts.timeout }) + + const noPrefetch = opts.wait === false || opts.prefetch === false || !core.core + const start = opts.start || 0 + const end = + opts.end === undefined ? (opts.length === undefined ? -1 : start + opts.length) : opts.end + 1 + + this._blockMap = null + + this._rangeStart = start + this._rangeEnd = end + + this._startIndex = 0 + this._startOffset = 0 + this._endIndex = -1 + this._endOffset = -1 + + this._range = null + this._noPrefetch = noPrefetch + } + + async _openp() { + this._blockMap = await blockMap.get(this.core, this.id) + + const [startIndex, startOffset, endIndex, endLength] = seekBlockMap( + this._blockMap, + this._rangeStart, + this._rangeEnd + ) + + this._startIndex = startIndex + this._startOffset = startOffset + this._endIndex = endIndex + this._endOffset = endLength + + if (this._endIndex === -1) { + this._endIndex = this._blockMap.blocks.length + this._endOffset = 0 + } + } + + _open(cb) { + this._openp().then(cb, cb) + } + + _predestroy() { + if (this._range) this._range.destroy() + this.core.close().then(noop, noop) + } + + _destroy(cb) { + if (this._range) this._range.destroy() + this.core.close().then(cb, cb) + } + + _prefetch(index) { + const blocks = [] + for (; index < this._endIndex; index++) blocks.push(this._blockMap.blocks[index]) + this._range = this.core.download({ blocks }) + } + + async _readp() { + if (this._startIndex >= this._endIndex) { + this.push(null) + return + } + + let block = null + + const index = this._startIndex++ + const b = this._blockMap.blocks[index] + + if (!this._range && !this._noPrefetch) { + block = await this.core.get(b.index, { wait: false }) + if (!block) this._prefetch(index) + } + + if (!block) { + block = await this.core.get(b.index) + } + + if (!block) throw BLOCK_NOT_AVAILABLE() + + if (this._startOffset) { + block = block.subarray(this._startOffset) + this._startOffset = 0 + } + + if (this._startIndex === this._endIndex && this._endOffset) { + block = block.subarray(0, block.byteLength - this._endOffset) + } + + this.push(block) + } + + _read(cb) { + this._readp().then(cb, cb) } } @@ -87,28 +242,26 @@ class BlobReadStream extends Readable { this._bytesRead = 0 } - _open(cb) { + async _openp() { if (this._pos === this.id.byteOffset) { this._index = this.id.blockOffset this._relativeOffset = 0 - return cb(null) + return } - this.core - .seek(this._pos, { - start: this.id.blockOffset, - end: this.id.blockOffset + this.id.blockLength - }) - .then( - (result) => { - if (!result) return cb(BLOCK_NOT_AVAILABLE()) + const result = await this.core.seek(this._pos, { + start: this.id.blockOffset, + end: this.id.blockOffset + this.id.blockLength + }) + + if (!result) throw BLOCK_NOT_AVAILABLE() + + this._index = result[0] + this._relativeOffset = result[1] + } - this._index = result[0] - this._relativeOffset = result[1] - return cb(null) - }, - (err) => cb(err) - ) + _open(cb) { + this._openp().then(cb, cb) } _predestroy() { @@ -121,39 +274,70 @@ class BlobReadStream extends Readable { this.core.close().then(cb, cb) } - _read(cb) { + async _readp() { if (this._pos >= this._end) { this.push(null) - return cb(null) + return } if (this._prefetch) this._prefetch.update(this._index) - this.core.get(this._index).then( - (block) => { - if (!block) return cb(BLOCK_NOT_AVAILABLE()) + let block = await this.core.get(this._index) + if (!block) throw BLOCK_NOT_AVAILABLE() - const remainder = this._end - this._pos - if (this._relativeOffset || remainder < block.length) { - block = block.subarray(this._relativeOffset, this._relativeOffset + remainder) - } + const remainder = this._end - this._pos + if (this._relativeOffset || remainder < block.length) { + block = block.subarray(this._relativeOffset, this._relativeOffset + remainder) + } - this._index++ - this._relativeOffset = 0 - this._pos += block.length - this._bytesRead += block.length + this._index++ + this._relativeOffset = 0 + this._pos += block.length + this._bytesRead += block.length - this.push(block) - return cb(null) - }, - (err) => cb(err) - ) + this.push(block) + } + + _read(cb) { + this._readp().then(cb, cb) } } module.exports = { + BlockMapReadStream, BlobReadStream, BlobWriteStream } function noop() {} + +function seekBlockMap(map, start, end) { + let s = -1 + let so = -1 + let e = -1 + let eo = -1 + + for (let i = 0; i < map.blocks.length; i++) { + const b = map.blocks[i] + + if (s === -1) { + if (start < b.byteLength) { + s = i + so = start + } else { + start -= b.byteLength + } + } + + if (e === -1 && end > -1) { + if (end <= b.byteLength) { + e = i + 1 + eo = b.byteLength - end + } else { + end -= b.byteLength + } + } + } + + return [s, so, e, eo] +} diff --git a/package.json b/package.json index 409ff26..89ccdea 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,8 @@ "dependencies": { "b4a": "^1.6.1", "bare-events": "^2.5.0", + "compact-encoding": "^2.18.0", + "hypercore-crypto": "^3.6.1", "hypercore-errors": "^1.1.1", "mutexify": "^1.4.0", "speedometer": "^1.1.0", diff --git a/test/all.js b/test/all.js index 42701d1..cdf1731 100644 --- a/test/all.js +++ b/test/all.js @@ -323,7 +323,7 @@ test('upload/download can be monitored', async (t) => { const blobsB = new Hyperblobs(b) const bytes = 1024 * 100 // big enough to trigger more than one update event - const buf = Buffer.alloc(bytes, '0') + const buf = b4a.alloc(bytes, '0') const id = await blobsA.put(buf) // add another blob which should not be monitored @@ -381,7 +381,7 @@ test('monitor is removed from the Set on close', async (t) => { const blobs = await create(t) const bytes = 1024 * 100 // big enough to trigger more than one update event - const buf = Buffer.alloc(bytes, '0') + const buf = b4a.alloc(bytes, '0') const id = await blobs.put(buf) const monitor = blobs.monitor(id) t.teardown(() => monitor.close()) @@ -395,20 +395,87 @@ test('basic batch', async (t) => { const batch = blobs.batch() { - const id = await batch.put(Buffer.from('hello world')) + const id = await batch.put(b4a.from('hello world')) const buf = await batch.get(id) - t.alike(buf, Buffer.from('hello world')) + t.alike(buf, b4a.from('hello world')) } { - const id = await batch.put(Buffer.from('hej verden')) + const id = await batch.put(b4a.from('hej verden')) const buf = await batch.get(id) - t.alike(buf, Buffer.from('hej verden')) + t.alike(buf, b4a.from('hej verden')) } await batch.flush() }) +test('basic block map', async (t) => { + const blobs = await create(t) + + { + const id = await blobs.put(b4a.from('hello world'), { blockMap: true }) + const blob = await blobs.get(id) + t.alike(blob, b4a.from('hello world')) + } + + { + const id = await blobs.put(b4a.alloc(1024 * 1024), { blockMap: true }) + const blob = await blobs.get(id) + t.alike(blob, b4a.alloc(1024 * 1024)) + } + + { + const buf = b4a.alloc(1024 * 1024, '0123456789') + const id = await blobs.put(buf, { blockMap: true }) + + { + const blob = await blobs.get(id, { start: 0, end: 0 }) + t.alike(blob, b4a.from('0')) + } + + { + const blob = await blobs.get(id, { start: 0, end: 1 }) + t.alike(blob, b4a.from('01')) + } + + { + const blob = await blobs.get(id, { start: 1, end: 1 }) + t.alike(blob, b4a.from('1')) + } + + { + const blob = await blobs.get(id, { start: 60000, length: 10000 }) + t.alike(blob, buf.subarray(60000, 70000)) + } + + { + const blob = await blobs.get(id, { start: 1, end: buf.byteLength - 2 }) + t.alike(blob, buf.subarray(1, buf.byteLength - 1)) + } + } +}) + +test('block map dedup', async (t) => { + const blobs = await create(t) + + const buf = b4a.alloc(1024 * 1024 * 3 + 1) + const buf2 = b4a.alloc(1024 * 1024 * 3 + 2) + const id = await blobs.put(buf, { blockMap: true, dedup: true }) + + { + const blob = await blobs.get(id) + t.alike(blob, buf) + } + + { + const n = await blobs.put(buf2, { blob: id, dedup: true }) + const blob = await blobs.get(n) + t.alike(blob, buf2) + } + + t.is(blobs.core.length, 5) +}) + async function createPair(t) { const a = new Hypercore(await t.tmp()) await a.ready()