diff --git a/builder/extension/messages.js b/builder/extension/messages.js new file mode 100644 index 0000000..92d4437 --- /dev/null +++ b/builder/extension/messages.js @@ -0,0 +1,80 @@ +const Hyperschema = require('hyperschema') + +const SCHEMA_DIR = './spec/hyperschema' + +const schema = Hyperschema.from(SCHEMA_DIR) +const ns = schema.namespace('hyperdb-extension') + +ns.register({ + name: 'range', + compact: true, + fields: [ + { + name: 'gt', + type: 'buffer' + }, + { + name: 'gte', + type: 'buffer' + }, + { + name: 'lt', + type: 'buffer' + }, + { + name: 'lte', + type: 'buffer' + }, + { + name: 'reverse', + type: 'bool' + }, + { + name: 'limit', + type: 'int' + } + ] +}) + +ns.register({ + name: 'message', + compact: true, + fields: [ + { + name: 'type', + type: 'uint', + required: true + }, + { + name: 'version', + type: 'uint' + }, + { + name: 'collectionName', + type: 'string' + }, + { + name: 'range', + type: '@hyperdb-extension/range' + }, + { + name: 'query', + type: 'buffer' + }, + { + name: 'blocks', + type: 'uint', + array: true + }, + { + name: 'start', + type: 'uint' + }, + { + name: 'end', + type: 'uint' + } + ] +}) + +Hyperschema.toDisk(schema) diff --git a/index.js b/index.js index 3a38fb5..7d3756d 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ const b4a = require('b4a') // engines const RocksEngine = require('./lib/engine/rocks') const BeeEngine = require('./lib/engine/bee') +const HyperDBExtension = require('./lib/extension') let compareHasDups = false @@ -224,7 +225,8 @@ class HyperDB { updates = new Updates(0, []), rootInstance = null, writable = true, - context = null + context = null, + extension = null } = {}) { this.version = version this.context = context @@ -236,6 +238,8 @@ class HyperDB { this.watchers = null this.closing = null + this.extension = extension !== false ? extension || HyperDBExtension.register(this) : false + engine.refs++ } @@ -249,10 +253,9 @@ class HyperDB { } static bee (core, definition, options = {}) { - const extension = options.extension const autoUpdate = !!options.autoUpdate - const db = new HyperDB(new BeeEngine(core, { extension }), definition, options) + const db = new HyperDB(new BeeEngine(core, { extension: false }), definition, options) if (autoUpdate) { const update = db.update.bind(db) @@ -382,13 +385,17 @@ class HyperDB { const { checkout = -1, - limit, + limit = -1, reverse = false } = query - const range = index === null - ? collection.encodeKeyRange(query) - : index.encodeKeyRange(query) + // encoded flag comes from lib/extension.js + const range = options?.encoded ? query : encodeQuery(query) + function encodeQuery (query) { + return index === null + ? collection.encodeKeyRange(query) + : index.encodeKeyRange(query) + } const overlay = checkout !== -1 ? [] @@ -396,13 +403,26 @@ class HyperDB { ? this.updates.collectionOverlay(collection, range, reverse) : this.updates.indexOverlay(index, range, reverse) + let completed = false + const extension = this.extension && options?.extension !== false ? this.extension : false + const onwait = extension && function (_, core) { + if (!extension || completed) return + extension.get(core.length, indexName, { ...range, limit, reverse }) + completed = true + } + return new IndexStream(this, range, { index, collection, reverse, limit, overlay, - checkout + checkout, + onwait, + wait: options?.wait, + update: options?.update, + onseq: options?.onseq, + extension }) } @@ -422,34 +442,39 @@ class HyperDB { return u !== null } - async get (collectionName, doc, { checkout = -1 } = {}) { + async get (collectionName, doc, opts = {}) { maybeClosed(this) + const checkout = opts.checkout || -1 + if (this.extension && opts.extension !== false) opts.extension = this.extension + const snap = this.engineSnapshot.ref() try { const collection = this.definition.resolveCollection(collectionName) - if (collection !== null) return await this._getCollection(collection, snap, doc, checkout) + if (collection !== null) return await this._getCollection(collection, snap, doc, checkout, { ...opts, collectionName: collection.name }) const index = this.definition.resolveIndex(collectionName) if (index === null) throw new Error('Unknown index or collection: ' + collectionName) - const key = index.encodeKey(doc, this.context) + // doc is a buf when it comes from lib/extension.js + const key = b4a.isBuffer(doc) ? doc : index.encodeKey(doc, this.context) if (key === null) return null const u = this.updates.getIndex(index, key) if (u !== null && checkout === -1) return u.value === null ? null : index.collection.reconstruct(this.version, u.key, u.value) - const value = await snap.get(key, checkout) + opts.collectionName = index.name + const value = await snap.get(key, checkout, opts) if (value === null) return null - return this._getCollection(index.collection, snap, index.reconstruct(key, value), checkout) + return this._getCollection(index.collection, snap, index.reconstruct(key, value), checkout, opts) } finally { if (snap !== null) snap.unref() } } - async _getCollection (collection, snap, doc, checkout) { + async _getCollection (collection, snap, doc, checkout, opts = {}) { maybeClosed(this) // we allow passing the raw primary key here cause thats what the trigger passes for simplicity @@ -457,7 +482,7 @@ class HyperDB { const key = b4a.isBuffer(doc) ? doc : collection.encodeKey(doc) const u = this.updates.get(key) - const value = (u !== null && checkout === -1) ? u.value : await snap.get(key, checkout) + const value = (u !== null && checkout === -1) ? u.value : await snap.get(key, checkout, opts) return value === null ? null : collection.reconstruct(this.version, key, value) } diff --git a/lib/engine/bee.js b/lib/engine/bee.js index b986d98..8fcfbdf 100644 --- a/lib/engine/bee.js +++ b/lib/engine/bee.js @@ -53,8 +53,13 @@ class BeeSnapshot { return Promise.all(promises) } - get (key, checkout) { - return getValue(this.snapshot, key, checkout) + get (key, checkout, opts = {}) { + const onwait = opts.extension && function (_, core) { + options.onwait = null + if (opts.extension) opts.extension.get(core.length, opts.collectionName, null, key) + } + const options = { ...opts, onwait } + return getValue(this.snapshot, key, checkout, options) } createReadStream (range, options) { @@ -185,8 +190,9 @@ async function getWrapped (db, key, value, checkout) { return { key, value: [value, await getValue(db, value, checkout)] } } -async function getValue (db, key, checkout) { - const node = await db.get(key, { checkout }) +async function getValue (db, key, checkout, opts = {}) { + opts.checkout = checkout + const node = await db.get(key, opts) return node === null ? null : node.value } diff --git a/lib/extension.js b/lib/extension.js new file mode 100644 index 0000000..a12ab2a --- /dev/null +++ b/lib/extension.js @@ -0,0 +1,103 @@ +const { encode, decode } = require('../spec/hyperschema') + +const encodeMessage = (obj) => encode('@hyperdb-extension/message', obj) +const decodeMessage = (buf) => decode('@hyperdb-extension/message', buf) + +const MESSAGE_TYPE = { + GET: 1, + CACHE: 2 +} + +const FLUSH_BATCH = 128 + +class Batch { + constructor (outgoing, from) { + this.blocks = [] + this.start = 0 + this.end = 0 + this.outgoing = outgoing + this.from = from + } + + push (seq) { + const len = this.blocks.push(seq) + if (len === 1 || seq < this.start) this.start = seq + if (len === 1 || seq >= this.end) this.end = seq + 1 + if (len >= FLUSH_BATCH) { + this.send() + this.clear() + } + } + + send () { + if (!this.blocks.length) return + this.outgoing.send(encodeMessage({ type: MESSAGE_TYPE.CACHE, blocks: this.blocks, start: this.start, end: this.end }), this.from) + } + + clear () { + this.start = this.end = 0 + this.blocks = [] + } +} + +class HyperDBExtension { + constructor (db) { + this.encoding = null + this.db = db + this.outgoing = null + this.active = 0 + } + + get (version, collectionName, range, query) { + const message = encodeMessage({ + type: MESSAGE_TYPE.GET, + version, + collectionName, + range, + query + }) + this.outgoing.broadcast(message) + } + + onmessage (buf, from) { + const message = decodeMessage(buf) + if (!message) return + if (message.type === MESSAGE_TYPE.GET) this.onget(message, from) + if (message.type === MESSAGE_TYPE.CACHE) this.oncache(message, from) + } + + oncache (message, from) { + if (!message.blocks.length) return + const { blocks, start, end } = message + this.db.engine.core.download({ blocks, start, end }) + } + + onget (message, from) { + if (!this.db.engineSnapshot) return + const version = this.db.engineSnapshot.snapshot.core.length + if (!message.version || message.version > version) return + + const b = new Batch(this.outgoing, from) + + const options = { checkout: message.version, extension: false, wait: false, update: false, onseq, encoded: true } + if (message.range) this.db.find(message.collectionName, message.range, options).toArray().then(done, done) + else this.db.get(message.collectionName, message.query, options).then(done, done) + + function done () { + b.send() + } + + function onseq (seq) { + b.push(seq) + } + } + + static register (db) { + if (!db.core) return null + const e = new this(db) + e.outgoing = db.core.registerExtension('hyperdb-extension', e) + return e + } +} + +module.exports = HyperDBExtension diff --git a/lib/stream.js b/lib/stream.js index 88be168..8ec96dc 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -2,7 +2,7 @@ const { Readable, isEnded, getStreamError } = require('streamx') const b4a = require('b4a') module.exports = class IndexStream extends Readable { - constructor (db, range, { index = null, collection = index.collection, reverse = false, limit = -1, overlay = [], checkout = 0 }) { + constructor (db, range, { index = null, collection = index.collection, reverse = false, limit = -1, overlay = [], checkout = 0, onwait, wait, update, onseq }) { super() this.version = db.version @@ -10,7 +10,7 @@ module.exports = class IndexStream extends Readable { this.snapshot = db.engineSnapshot this.collection = collection this.index = index - this.stream = this.snapshot.createReadStream(range, { reverse, limit, checkout }) + this.stream = this.snapshot.createReadStream(range, { reverse, limit, checkout, wait, update, onseq, onwait }) this.streamClosed = false this.overlay = overlay this.overlayIndex = 0 diff --git a/package.json b/package.json index 3e7c5cd..2cc91db 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,8 @@ "runtime.mjs", "builder/*.js", "builder/*.mjs", - "lib/**" + "lib/**", + "spec/**" ], "scripts": { "test": "standard && brittle test/*.js", diff --git a/spec/hyperschema/index.js b/spec/hyperschema/index.js new file mode 100644 index 0000000..396512c --- /dev/null +++ b/spec/hyperschema/index.js @@ -0,0 +1,157 @@ +// This file is autogenerated by the hyperschema compiler +// Schema Version: 1 +/* eslint-disable camelcase */ +/* eslint-disable quotes */ + +const VERSION = 1 +const { c } = require('hyperschema/runtime') + +// eslint-disable-next-line no-unused-vars +let version = VERSION + +// @hyperdb-extension/range +const encoding0 = { + preencode (state, m) { + state.end++ // max flag is 32 so always one byte + + if (m.gt) c.buffer.preencode(state, m.gt) + if (m.gte) c.buffer.preencode(state, m.gte) + if (m.lt) c.buffer.preencode(state, m.lt) + if (m.lte) c.buffer.preencode(state, m.lte) + if (m.limit) c.int.preencode(state, m.limit) + }, + encode (state, m) { + const flags = + (m.gt ? 1 : 0) | + (m.gte ? 2 : 0) | + (m.lt ? 4 : 0) | + (m.lte ? 8 : 0) | + (m.reverse ? 16 : 0) | + (m.limit ? 32 : 0) + + c.uint.encode(state, flags) + + if (m.gt) c.buffer.encode(state, m.gt) + if (m.gte) c.buffer.encode(state, m.gte) + if (m.lt) c.buffer.encode(state, m.lt) + if (m.lte) c.buffer.encode(state, m.lte) + if (m.limit) c.int.encode(state, m.limit) + }, + decode (state) { + const flags = c.uint.decode(state) + + return { + gt: (flags & 1) !== 0 ? c.buffer.decode(state) : null, + gte: (flags & 2) !== 0 ? c.buffer.decode(state) : null, + lt: (flags & 4) !== 0 ? c.buffer.decode(state) : null, + lte: (flags & 8) !== 0 ? c.buffer.decode(state) : null, + reverse: (flags & 16) !== 0, + limit: (flags & 32) !== 0 ? c.int.decode(state) : 0 + } + } +} + +// @hyperdb-extension/message.blocks +const encoding1_5 = c.array(c.uint) + +// @hyperdb-extension/message +const encoding1 = { + preencode (state, m) { + c.uint.preencode(state, m.type) + state.end++ // max flag is 64 so always one byte + + if (m.version) c.uint.preencode(state, m.version) + if (m.collectionName) c.string.preencode(state, m.collectionName) + if (m.range) encoding0.preencode(state, m.range) + if (m.query) c.buffer.preencode(state, m.query) + if (m.blocks) encoding1_5.preencode(state, m.blocks) + if (m.start) c.uint.preencode(state, m.start) + if (m.end) c.uint.preencode(state, m.end) + }, + encode (state, m) { + const flags = + (m.version ? 1 : 0) | + (m.collectionName ? 2 : 0) | + (m.range ? 4 : 0) | + (m.query ? 8 : 0) | + (m.blocks ? 16 : 0) | + (m.start ? 32 : 0) | + (m.end ? 64 : 0) + + c.uint.encode(state, m.type) + c.uint.encode(state, flags) + + if (m.version) c.uint.encode(state, m.version) + if (m.collectionName) c.string.encode(state, m.collectionName) + if (m.range) encoding0.encode(state, m.range) + if (m.query) c.buffer.encode(state, m.query) + if (m.blocks) encoding1_5.encode(state, m.blocks) + if (m.start) c.uint.encode(state, m.start) + if (m.end) c.uint.encode(state, m.end) + }, + decode (state) { + const r0 = c.uint.decode(state) + const flags = c.uint.decode(state) + + return { + type: r0, + version: (flags & 1) !== 0 ? c.uint.decode(state) : 0, + collectionName: (flags & 2) !== 0 ? c.string.decode(state) : null, + range: (flags & 4) !== 0 ? encoding0.decode(state) : null, + query: (flags & 8) !== 0 ? c.buffer.decode(state) : null, + blocks: (flags & 16) !== 0 ? encoding1_5.decode(state) : 0, + start: (flags & 32) !== 0 ? c.uint.decode(state) : 0, + end: (flags & 64) !== 0 ? c.uint.decode(state) : 0 + } + } +} + +function setVersion (v) { + version = v +} + +function encode (name, value, v = VERSION) { + version = v + return c.encode(getEncoding(name), value) +} + +function decode (name, buffer, v = VERSION) { + version = v + return c.decode(getEncoding(name), buffer) +} + +function getEnum (name) { + switch (name) { + default: throw new Error('Enum not found ' + name) + } +} + +function getEncoding (name) { + switch (name) { + case '@hyperdb-extension/range': return encoding0 + case '@hyperdb-extension/message': return encoding1 + default: throw new Error('Encoder not found ' + name) + } +} + +function getStruct (name, v = VERSION) { + const enc = getEncoding(name) + return { + preencode (state, m) { + version = v + enc.preencode(state, m) + }, + encode (state, m) { + version = v + enc.encode(state, m) + }, + decode (state) { + version = v + return enc.decode(state) + } + } +} + +const resolveStruct = getStruct // compat + +module.exports = { resolveStruct, getStruct, getEnum, getEncoding, encode, decode, setVersion, version } diff --git a/spec/hyperschema/schema.json b/spec/hyperschema/schema.json new file mode 100644 index 0000000..b5d799d --- /dev/null +++ b/spec/hyperschema/schema.json @@ -0,0 +1,93 @@ +{ + "version": 1, + "schema": [ + { + "name": "range", + "namespace": "hyperdb-extension", + "compact": true, + "flagsPosition": 0, + "fields": [ + { + "name": "gt", + "type": "buffer", + "version": 1 + }, + { + "name": "gte", + "type": "buffer", + "version": 1 + }, + { + "name": "lt", + "type": "buffer", + "version": 1 + }, + { + "name": "lte", + "type": "buffer", + "version": 1 + }, + { + "name": "reverse", + "type": "bool", + "version": 1 + }, + { + "name": "limit", + "type": "int", + "version": 1 + } + ] + }, + { + "name": "message", + "namespace": "hyperdb-extension", + "compact": true, + "flagsPosition": 1, + "fields": [ + { + "name": "type", + "required": true, + "type": "uint", + "version": 1 + }, + { + "name": "version", + "type": "uint", + "version": 1 + }, + { + "name": "collectionName", + "type": "string", + "version": 1 + }, + { + "name": "range", + "type": "@hyperdb-extension/range", + "version": 1 + }, + { + "name": "query", + "type": "buffer", + "version": 1 + }, + { + "name": "blocks", + "array": true, + "type": "uint", + "version": 1 + }, + { + "name": "start", + "type": "uint", + "version": 1 + }, + { + "name": "end", + "type": "uint", + "version": 1 + } + ] + } + ] +} \ No newline at end of file diff --git a/test/extension.js b/test/extension.js new file mode 100644 index 0000000..b86e20f --- /dev/null +++ b/test/extension.js @@ -0,0 +1,109 @@ +const b4a = require('b4a') +const definition = require('./fixtures/definition') +const { test, replicate } = require('./helpers') + +test.bee('basic extension test', async function ({ create }, t) { + t.plan(6) + const db = await create(definition) + + await db.insert('members', { id: 'user', age: 1 }) + await db.insert('members', { id: 'user2', age: 2 }) + await db.insert('members', { id: 'user3', age: 3 }) + await db.flush() + + const clone = await create(definition, { key: db.core.key }) + clone.core.on('append', () => clone.update()) + + replicate(t, clone, db) + + await waitLengthMatch(db.core, clone.core) + + let q = null + { + const og = clone.extension.get.bind(clone.extension) + clone.extension.get = (version, collectionName, range, query) => { + q = query + t.is(collectionName, 'members') + t.absent(range) + t.ok(query) + og(version, collectionName, range, query) + } + } + + { + const og = db.extension.onget.bind(db.extension) + db.extension.onget = (message, from) => { + t.ok(b4a.equals(message.query, q)) + t.is(message.collectionName, 'members') + og(message, from) + } + } + + await db.insert('members', { id: 'user4', age: 3 }) + await db.flush() + await waitLengthMatch(db.core, clone.core) + + t.ok(await clone.get('members', { id: 'user4' })) + + t.teardown(async () => { + await db.close() + await clone.close() + }) +}) + +test.bee('basic extension range test', async function ({ create }, t) { + t.plan(9) + const db = await create(definition) + + await db.insert('members', { id: 'user', age: 1 }) + await db.insert('members', { id: 'user2', age: 2 }) + await db.insert('members', { id: 'user3', age: 3 }) + await db.flush() + + const clone = await create(definition, { key: db.core.key }) + clone.core.on('append', () => clone.update()) + + replicate(t, clone, db) + + await waitLengthMatch(db.core, clone.core) + + let r = null + { + const og = clone.extension.get.bind(clone.extension) + clone.extension.get = (version, collectionName, range, query) => { + r = range + t.is(collectionName, 'members/by-age') + t.ok(range) + t.absent(query) + og(version, collectionName, range, query) + } + } + + { + const og = db.extension.onget.bind(db.extension) + db.extension.onget = (message, from) => { + t.ok(b4a.equals(message.range.gte, r.gte)) + t.ok(b4a.equals(message.range.lte, r.lte)) + t.ok(message.range.reverse === r.reverse) + t.ok(message.range.limit === r.limit) + t.is(message.collectionName, 'members/by-age') + og(message, from) + } + } + + await db.insert('members', { id: 'user4', age: 3 }) + await db.flush() + await waitLengthMatch(db.core, clone.core) + + const arr = await clone.find('members/by-age', { gte: { age: 1 }, lte: { age: 3 } }).toArray() + t.is(arr.length, 4) + + t.teardown(async () => { + await db.close() + await clone.close() + }) +}) + +async function waitLengthMatch (a, b) { + while (a.length !== b.length) await new Promise(resolve => setTimeout(resolve, 100)) +}