Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions builder/extension/messages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const Hyperschema = require('hyperschema')

const SCHEMA_DIR = './spec/hyperschema'

const schema = Hyperschema.from(SCHEMA_DIR)
const ns = schema.namespace('hyperdb-extension')

ns.register({
name: 'range',
compact: true,
fields: [
{
name: 'gt',
type: 'buffer'
},
{
name: 'gte',
type: 'buffer'
},
{
name: 'lt',
type: 'buffer'
},
{
name: 'lte',
type: 'buffer'
},
{
name: 'reverse',
type: 'bool'
},
{
name: 'limit',
type: 'int'
}
]
})

ns.register({
name: 'message',
compact: true,
fields: [
{
name: 'type',
type: 'uint',
required: true
},
{
name: 'version',
type: 'uint'
},
{
name: 'collectionName',
type: 'string'
},
{
name: 'range',
type: '@hyperdb-extension/range'
},
{
name: 'query',
type: 'buffer'
},
{
name: 'blocks',
type: 'uint',
array: true
},
{
name: 'start',
type: 'uint'
},
{
name: 'end',
type: 'uint'
}
]
})

Hyperschema.toDisk(schema)
55 changes: 40 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const b4a = require('b4a')
// engines
const RocksEngine = require('./lib/engine/rocks')
const BeeEngine = require('./lib/engine/bee')
const HyperDBExtension = require('./lib/extension')

let compareHasDups = false

Expand Down Expand Up @@ -224,7 +225,8 @@ class HyperDB {
updates = new Updates(0, []),
rootInstance = null,
writable = true,
context = null
context = null,
extension = null
} = {}) {
this.version = version
this.context = context
Expand All @@ -236,6 +238,8 @@ class HyperDB {
this.watchers = null
this.closing = null

this.extension = extension !== false ? extension || HyperDBExtension.register(this) : false

engine.refs++
}

Expand All @@ -249,10 +253,9 @@ class HyperDB {
}

static bee (core, definition, options = {}) {
const extension = options.extension
const autoUpdate = !!options.autoUpdate

const db = new HyperDB(new BeeEngine(core, { extension }), definition, options)
const db = new HyperDB(new BeeEngine(core, { extension: false }), definition, options)

if (autoUpdate) {
const update = db.update.bind(db)
Expand Down Expand Up @@ -382,27 +385,44 @@ class HyperDB {

const {
checkout = -1,
limit,
limit = -1,
reverse = false
} = query

const range = index === null
? collection.encodeKeyRange(query)
: index.encodeKeyRange(query)
// encoded flag comes from lib/extension.js
const range = options?.encoded ? query : encodeQuery(query)
function encodeQuery (query) {
return index === null
? collection.encodeKeyRange(query)
: index.encodeKeyRange(query)
}

const overlay = checkout !== -1
? []
: index === null
? this.updates.collectionOverlay(collection, range, reverse)
: this.updates.indexOverlay(index, range, reverse)

let completed = false
const extension = this.extension && options?.extension !== false ? this.extension : false
const onwait = extension && function (_, core) {
if (!extension || completed) return
extension.get(core.length, indexName, { ...range, limit, reverse })
completed = true
}

return new IndexStream(this, range, {
index,
collection,
reverse,
limit,
overlay,
checkout
checkout,
onwait,
wait: options?.wait,
update: options?.update,
onseq: options?.onseq,
extension
})
}

Expand All @@ -422,42 +442,47 @@ class HyperDB {
return u !== null
}

async get (collectionName, doc, { checkout = -1 } = {}) {
async get (collectionName, doc, opts = {}) {
maybeClosed(this)

const checkout = opts.checkout || -1
if (this.extension && opts.extension !== false) opts.extension = this.extension

const snap = this.engineSnapshot.ref()

try {
const collection = this.definition.resolveCollection(collectionName)
if (collection !== null) return await this._getCollection(collection, snap, doc, checkout)
if (collection !== null) return await this._getCollection(collection, snap, doc, checkout, { ...opts, collectionName: collection.name })

const index = this.definition.resolveIndex(collectionName)
if (index === null) throw new Error('Unknown index or collection: ' + collectionName)

const key = index.encodeKey(doc, this.context)
// doc is a buf when it comes from lib/extension.js
const key = b4a.isBuffer(doc) ? doc : index.encodeKey(doc, this.context)
if (key === null) return null

const u = this.updates.getIndex(index, key)
if (u !== null && checkout === -1) return u.value === null ? null : index.collection.reconstruct(this.version, u.key, u.value)

const value = await snap.get(key, checkout)
opts.collectionName = index.name
const value = await snap.get(key, checkout, opts)
if (value === null) return null

return this._getCollection(index.collection, snap, index.reconstruct(key, value), checkout)
return this._getCollection(index.collection, snap, index.reconstruct(key, value), checkout, opts)
} finally {
if (snap !== null) snap.unref()
}
}

async _getCollection (collection, snap, doc, checkout) {
async _getCollection (collection, snap, doc, checkout, opts = {}) {
maybeClosed(this)

// we allow passing the raw primary key here cause thats what the trigger passes for simplicity
// you shouldnt rely on that.
const key = b4a.isBuffer(doc) ? doc : collection.encodeKey(doc)

const u = this.updates.get(key)
const value = (u !== null && checkout === -1) ? u.value : await snap.get(key, checkout)
const value = (u !== null && checkout === -1) ? u.value : await snap.get(key, checkout, opts)

return value === null ? null : collection.reconstruct(this.version, key, value)
}
Expand Down
14 changes: 10 additions & 4 deletions lib/engine/bee.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ class BeeSnapshot {
return Promise.all(promises)
}

get (key, checkout) {
return getValue(this.snapshot, key, checkout)
get (key, checkout, opts = {}) {
const onwait = opts.extension && function (_, core) {
options.onwait = null
if (opts.extension) opts.extension.get(core.length, opts.collectionName, null, key)
}
const options = { ...opts, onwait }
return getValue(this.snapshot, key, checkout, options)
}

createReadStream (range, options) {
Expand Down Expand Up @@ -185,8 +190,9 @@ async function getWrapped (db, key, value, checkout) {
return { key, value: [value, await getValue(db, value, checkout)] }
}

async function getValue (db, key, checkout) {
const node = await db.get(key, { checkout })
async function getValue (db, key, checkout, opts = {}) {
opts.checkout = checkout
const node = await db.get(key, opts)
return node === null ? null : node.value
}

Expand Down
103 changes: 103 additions & 0 deletions lib/extension.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const { encode, decode } = require('../spec/hyperschema')

const encodeMessage = (obj) => encode('@hyperdb-extension/message', obj)
const decodeMessage = (buf) => decode('@hyperdb-extension/message', buf)

const MESSAGE_TYPE = {
GET: 1,
CACHE: 2
}

const FLUSH_BATCH = 128

class Batch {
constructor (outgoing, from) {
this.blocks = []
this.start = 0
this.end = 0
this.outgoing = outgoing
this.from = from
}

push (seq) {
const len = this.blocks.push(seq)
if (len === 1 || seq < this.start) this.start = seq
if (len === 1 || seq >= this.end) this.end = seq + 1
if (len >= FLUSH_BATCH) {
this.send()
this.clear()
}
}

send () {
if (!this.blocks.length) return
this.outgoing.send(encodeMessage({ type: MESSAGE_TYPE.CACHE, blocks: this.blocks, start: this.start, end: this.end }), this.from)
}

clear () {
this.start = this.end = 0
this.blocks = []
}
}

class HyperDBExtension {
constructor (db) {
this.encoding = null
this.db = db
this.outgoing = null
this.active = 0
}

get (version, collectionName, range, query) {
const message = encodeMessage({
type: MESSAGE_TYPE.GET,
version,
collectionName,
range,
query
})
this.outgoing.broadcast(message)
}

onmessage (buf, from) {
const message = decodeMessage(buf)
if (!message) return
if (message.type === MESSAGE_TYPE.GET) this.onget(message, from)
if (message.type === MESSAGE_TYPE.CACHE) this.oncache(message, from)
}

oncache (message, from) {
if (!message.blocks.length) return
const { blocks, start, end } = message
this.db.engine.core.download({ blocks, start, end })
}

onget (message, from) {
if (!this.db.engineSnapshot) return
const version = this.db.engineSnapshot.snapshot.core.length
if (!message.version || message.version > version) return

const b = new Batch(this.outgoing, from)

const options = { checkout: message.version, extension: false, wait: false, update: false, onseq, encoded: true }
if (message.range) this.db.find(message.collectionName, message.range, options).toArray().then(done, done)
else this.db.get(message.collectionName, message.query, options).then(done, done)

function done () {
b.send()
}

function onseq (seq) {
b.push(seq)
}
}

static register (db) {
if (!db.core) return null
const e = new this(db)
e.outgoing = db.core.registerExtension('hyperdb-extension', e)
return e
}
}

module.exports = HyperDBExtension
4 changes: 2 additions & 2 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ const { Readable, isEnded, getStreamError } = require('streamx')
const b4a = require('b4a')

module.exports = class IndexStream extends Readable {
constructor (db, range, { index = null, collection = index.collection, reverse = false, limit = -1, overlay = [], checkout = 0 }) {
constructor (db, range, { index = null, collection = index.collection, reverse = false, limit = -1, overlay = [], checkout = 0, onwait, wait, update, onseq }) {
super()

this.version = db.version
this.engine = db.engine
this.snapshot = db.engineSnapshot
this.collection = collection
this.index = index
this.stream = this.snapshot.createReadStream(range, { reverse, limit, checkout })
this.stream = this.snapshot.createReadStream(range, { reverse, limit, checkout, wait, update, onseq, onwait })
this.streamClosed = false
this.overlay = overlay
this.overlayIndex = 0
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"runtime.mjs",
"builder/*.js",
"builder/*.mjs",
"lib/**"
"lib/**",
"spec/**"
],
"scripts": {
"test": "standard && brittle test/*.js",
Expand Down
Loading