Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
715992b
fix: a series of issues
Saviio Mar 28, 2026
55ef6e1
feat: advance live query trace and planner optimizations
Saviio Mar 29, 2026
687a33b
perf(trace): compile bootstrap executor and skip covered filters
Saviio Mar 29, 2026
729ada4
perf(trace): compile fused update pipeline
Saviio Mar 29, 2026
8f8d696
perf(storage): optimize batch gin inserts
Saviio Mar 29, 2026
39acbab
perf(storage): batch gin builder flush and compile gin paths
Saviio Mar 29, 2026
80aea37
perf(core): improve trace runtime and gin ingest hot paths
Saviio Mar 29, 2026
4bad52c
perf(storage): reduce init gin bulk insert overhead
Saviio Mar 29, 2026
7aac9b6
fix(benchmark): stabilize live query mutation targeting
Saviio Mar 30, 2026
5c64efc
perf(graphql): reduce live subscription rebuild overhead
Saviio Mar 30, 2026
727e736
feat(graphql): widen delta live planning for unique reverse limit
Saviio Mar 30, 2026
155f810
feat(graphql): add relation-aware live root filters
Saviio Mar 30, 2026
79f3b56
perf(graphql): profile snapshot lane and reuse root payload cache
Saviio Mar 30, 2026
d14199d
perf(graphql): accelerate snapshot full-payload subscriptions
Saviio Mar 30, 2026
2f3e10e
fix(snapshot): preserve root-subset refresh across deletes
Saviio Mar 30, 2026
0190aaf
refactor(snapshot): unify root-subset variant selection
Saviio Mar 30, 2026
170f117
perf(graphql): smooth encode cache pruning
Saviio Mar 30, 2026
87ae796
perf(graphql): widen reverse relation windowed fetch
Saviio Mar 30, 2026
9af2045
fix(js): refresh wasm bindings after export renumbering
Saviio Mar 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion crates/binary/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() -> cynos_core::Result<()> {

- Use `SchemaLayout::from_projection()` for projected queries.
- Use `SchemaLayout::from_schemas()` when encoding joined rows.
- Enable the `wasm` feature if you want `wasm-bindgen` exports such as `BinaryResult::asView()`.
- Enable the `wasm` feature if you want `wasm-bindgen` exports such as `BinaryResult::asView()` and `BinaryResult::intoTransferable()`.

## License

Expand Down
8 changes: 8 additions & 0 deletions crates/binary/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ impl BinaryEncoder {

/// Encode a batch of rows
pub fn encode_rows(&mut self, rows: &[Rc<Row>]) {
self.encode_rows_iter(rows.iter());
}

/// Encode a batch of rows from any iterator over shared rows.
pub fn encode_rows_iter<'a, I>(&mut self, rows: I)
where
I: IntoIterator<Item = &'a Rc<Row>>,
{
// Reserve space for header (will be written at the end)
if self.buffer.is_empty() {
self.buffer.resize(HEADER_SIZE, 0);
Expand Down
9 changes: 9 additions & 0 deletions crates/binary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ impl BinaryResult {
js_sys::Uint8Array::from(&self.buffer[..])
}

/// Copy the buffer into a standalone Uint8Array suitable for `postMessage`
/// transfer lists and other ownership-taking APIs.
///
/// Unlike `asView()`, the returned bytes are no longer tied to WASM memory.
#[wasm_bindgen(js_name = intoTransferable)]
pub fn into_transferable(self) -> js_sys::Uint8Array {
js_sys::Uint8Array::from(&self.buffer[..])
}

/// Get a zero-copy Uint8Array view into WASM memory.
/// WARNING: This view becomes invalid if WASM memory grows or if this BinaryResult is freed.
/// The caller must ensure the BinaryResult outlives any use of the returned view.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ mod value;

pub use error::{Error, Result};
pub use row::{
next_row_id, reserve_row_ids, set_next_row_id, set_next_row_id_if_greater, Row, RowId,
DUMMY_ROW_ID,
aggregate_group_row_id, join_row_id, left_join_null_row_id, next_row_id, reserve_row_ids,
right_join_null_row_id, set_next_row_id, set_next_row_id_if_greater, Row, RowId, DUMMY_ROW_ID,
};
pub use types::DataType;
pub use value::{JsonbValue, Value};
116 changes: 116 additions & 0 deletions crates/core/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::value::Value;
use alloc::vec::Vec;
use core::hash::{Hash, Hasher};
use core::sync::atomic::{AtomicU64, Ordering};

/// Unique identifier for a row.
Expand All @@ -13,9 +14,63 @@ pub type RowId = u64;
/// (e.g., the result of joining two rows).
pub const DUMMY_ROW_ID: RowId = u64::MAX;

const ROW_ID_HASH_OFFSET: u64 = 0xCBF2_9CE4_8422_2325;
const ROW_ID_HASH_PRIME: u64 = 0x0000_0001_0000_01B3;
const JOIN_ROW_ID_DOMAIN: u64 = 0x4A4F_494E_5F49_4E4E;
const LEFT_JOIN_NULL_ROW_ID_DOMAIN: u64 = 0x4A4F_494E_5F4C_4E55;
const RIGHT_JOIN_NULL_ROW_ID_DOMAIN: u64 = 0x4A4F_494E_5F52_4E55;
const AGGREGATE_GROUP_ROW_ID_DOMAIN: u64 = 0x4147_475F_4752_4F55;

/// Global row ID counter for generating unique row IDs.
static NEXT_ROW_ID: AtomicU64 = AtomicU64::new(0);

struct RowIdHasher {
state: u64,
}

impl RowIdHasher {
#[inline]
fn new(domain: u64) -> Self {
Self {
state: ROW_ID_HASH_OFFSET ^ domain.rotate_left(7),
}
}
}

impl Hasher for RowIdHasher {
#[inline]
fn finish(&self) -> u64 {
self.state
}

#[inline]
fn write(&mut self, bytes: &[u8]) {
for &byte in bytes {
self.state ^= u64::from(byte);
self.state = self.state.wrapping_mul(ROW_ID_HASH_PRIME);
}
}
}

#[inline]
fn finalize_derived_row_id(id: u64) -> RowId {
if id == DUMMY_ROW_ID {
DUMMY_ROW_ID.wrapping_sub(1)
} else {
id
}
}

#[inline]
fn hash_row_id<F>(domain: u64, feed: F) -> RowId
where
F: FnOnce(&mut RowIdHasher),
{
let mut hasher = RowIdHasher::new(domain);
feed(&mut hasher);
finalize_derived_row_id(hasher.finish())
}

/// Gets the next unique row ID.
pub fn next_row_id() -> RowId {
NEXT_ROW_ID.fetch_add(1, Ordering::SeqCst)
Expand All @@ -37,6 +92,42 @@ pub fn set_next_row_id_if_greater(id: RowId) {
NEXT_ROW_ID.fetch_max(id, Ordering::SeqCst);
}

/// Derives a deterministic row ID for an inner join row.
#[inline]
pub fn join_row_id(left_id: RowId, right_id: RowId) -> RowId {
hash_row_id(JOIN_ROW_ID_DOMAIN, |hasher| {
left_id.hash(hasher);
right_id.hash(hasher);
})
}

/// Derives a deterministic row ID for a left/full outer join row with a NULL right side.
#[inline]
pub fn left_join_null_row_id(left_id: RowId) -> RowId {
hash_row_id(LEFT_JOIN_NULL_ROW_ID_DOMAIN, |hasher| {
left_id.hash(hasher);
})
}

/// Derives a deterministic row ID for a right/full outer join row with a NULL left side.
#[inline]
pub fn right_join_null_row_id(right_id: RowId) -> RowId {
hash_row_id(RIGHT_JOIN_NULL_ROW_ID_DOMAIN, |hasher| {
right_id.hash(hasher);
})
}

/// Derives a deterministic row ID for an aggregate group row from its group key.
#[inline]
pub fn aggregate_group_row_id(group_key: &[Value]) -> RowId {
hash_row_id(AGGREGATE_GROUP_ROW_ID_DOMAIN, |hasher| {
group_key.len().hash(hasher);
for value in group_key {
value.hash(hasher);
}
})
}

/// A row in a database table.
#[derive(Clone, Debug)]
pub struct Row {
Expand Down Expand Up @@ -243,4 +334,29 @@ mod tests {
assert!(row.is_dummy());
assert_eq!(row.version(), 5);
}

#[test]
fn test_join_row_id_is_deterministic() {
assert_eq!(join_row_id(1, 2), join_row_id(1, 2));
assert_ne!(join_row_id(1, 2), join_row_id(2, 1));
}

#[test]
fn test_derived_row_id_domains_do_not_collide_for_same_input() {
let base = 42;
assert_ne!(join_row_id(base, 7), left_join_null_row_id(base));
assert_ne!(left_join_null_row_id(base), right_join_null_row_id(base));
}

#[test]
fn test_aggregate_group_row_id_depends_on_group_key() {
assert_eq!(
aggregate_group_row_id(&[Value::Int64(1), Value::String("A".into())]),
aggregate_group_row_id(&[Value::Int64(1), Value::String("A".into())]),
);
assert_ne!(
aggregate_group_row_id(&[Value::Int64(1)]),
aggregate_group_row_id(&[Value::Int64(2)]),
);
}
}
44 changes: 44 additions & 0 deletions crates/core/src/schema/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub struct IndexDef {
unique: bool,
/// Index type.
index_type: IndexType,
/// Optional normalized JSON paths covered by a GIN index.
///
/// `None` means the index covers the full JSON tree.
gin_paths: Option<Vec<String>>,
}

impl IndexDef {
Expand All @@ -87,6 +91,7 @@ impl IndexDef {
columns,
unique: false,
index_type: IndexType::BTree,
gin_paths: None,
}
}

Expand All @@ -102,6 +107,12 @@ impl IndexDef {
self
}

/// Restricts a GIN index to a set of normalized JSON paths.
pub fn with_gin_paths(mut self, paths: Vec<String>) -> Self {
self.gin_paths = if paths.is_empty() { None } else { Some(paths) };
self
}

/// Returns the index name.
#[inline]
pub fn name(&self) -> &str {
Expand Down Expand Up @@ -137,6 +148,24 @@ impl IndexDef {
self.index_type
}

/// Returns the normalized JSON paths covered by this GIN index.
#[inline]
pub fn gin_paths(&self) -> Option<&[String]> {
self.gin_paths.as_deref()
}

/// Returns whether this GIN index can answer lookups for the given path.
#[inline]
pub fn supports_gin_path(&self, path: &str) -> bool {
if self.index_type != IndexType::Gin {
return false;
}

self.gin_paths.as_ref().map_or(true, |paths| {
paths.iter().any(|candidate| candidate == path)
})
}

/// Returns whether this is a single-column index.
#[inline]
pub fn is_single_column(&self) -> bool {
Expand All @@ -158,6 +187,7 @@ impl PartialEq for IndexDef {
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::ToString;
use alloc::vec;

#[test]
Expand Down Expand Up @@ -199,4 +229,18 @@ mod tests {
assert!(!idx.is_single_column());
assert_eq!(idx.columns().len(), 2);
}

#[test]
fn test_gin_index_paths() {
let idx = IndexDef::new("idx_profile", "users", vec![IndexedColumn::new("profile")])
.index_type(IndexType::Gin)
.with_gin_paths(vec!["customer.tier".into(), "risk.bucket".into()]);

assert!(idx.supports_gin_path("customer.tier"));
assert!(!idx.supports_gin_path("flags.strategic"));
assert_eq!(
idx.gin_paths().unwrap(),
["customer.tier".to_string(), "risk.bucket".to_string()]
);
}
}
63 changes: 63 additions & 0 deletions crates/core/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::error::{Error, Result};
use crate::types::DataType;
use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;

/// A table definition in the database schema.
Expand Down Expand Up @@ -286,6 +287,43 @@ impl TableBuilder {
Ok(self)
}

/// Adds a path-restricted GIN index for a JSONB column.
pub fn add_jsonb_index(
mut self,
name: impl Into<String>,
column: &str,
paths: &[&str],
) -> Result<Self> {
let name = name.into();
Self::check_naming_rules(&name)?;

let column_def = self
.columns
.iter()
.find(|c| c.name() == column)
.ok_or_else(|| Error::InvalidSchema {
message: format!("Column not found: {}", column),
})?;

if column_def.data_type() != DataType::Jsonb {
return Err(Error::InvalidSchema {
message: format!("JSONB index requires Jsonb column: {}", column),
});
}

let normalized_paths = paths
.iter()
.filter(|path| !path.is_empty())
.map(|path| (*path).to_string())
.collect();

let idx = IndexDef::new(name, &self.name, vec![IndexedColumn::new(column)])
.index_type(IndexType::Gin)
.with_gin_paths(normalized_paths);
self.indices.push(idx);
Ok(self)
}

/// Adds a hash index for point-lookups on scalar columns.
pub fn add_hash_index(
mut self,
Expand Down Expand Up @@ -522,6 +560,31 @@ mod tests {
assert_eq!(index.get_index_type(), IndexType::Hash);
assert!(index.is_unique());
}

#[test]
fn test_add_jsonb_index_paths() {
let table = TableBuilder::new("issues")
.unwrap()
.add_column("id", DataType::Int64)
.unwrap()
.add_column("metadata", DataType::Jsonb)
.unwrap()
.add_jsonb_index(
"idx_issues_metadata",
"metadata",
&["customer.tier", "risk.bucket"],
)
.unwrap()
.build()
.unwrap();

let index = table.get_index("idx_issues_metadata").unwrap();
assert_eq!(index.get_index_type(), IndexType::Gin);
assert_eq!(
index.gin_paths().unwrap(),
["customer.tier".to_string(), "risk.bucket".to_string()]
);
}
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ harness = false
default = ["jsonb", "incremental"]
jsonb = []
incremental = []
benchmark = []
benchmark = ["cynos-storage/benchmark"]

[package.metadata.wasm-pack.profile.release]
wasm-opt = false
8 changes: 8 additions & 0 deletions crates/database/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
col,
createDatabase,
initCynos,
snapshotSchemaLayout,
} from '@cynos/core';

await initCynos();
Expand Down Expand Up @@ -73,6 +74,13 @@ const stop = stream.subscribe((rows) => {
console.log('current result', rows);
});

const binaryLayout = snapshotSchemaLayout(stream.getSchemaLayout());
const stopBinary = stream.subscribeBinary((binary) => {
const transferable = binary.intoTransferable();
const rs = new ResultSet(transferable, binaryLayout);
console.log('binary snapshot', rs.toArray());
});

const trace = db
.select('*')
.from('users')
Expand Down
Loading