From 019c649ef6cc3baa954134744857e63dd3d9425e Mon Sep 17 00:00:00 2001 From: zenotme Date: Sun, 25 Jan 2026 23:57:31 +0800 Subject: [PATCH] Add compact theta sketch serialization --- datasketches/src/common/mod.rs | 2 + datasketches/src/common/seed_hash.rs | 28 + datasketches/src/countmin/serialization.rs | 11 +- datasketches/src/theta/hash_table.rs | 4 + datasketches/src/theta/implementation.md | 179 ++++++ datasketches/src/theta/mod.rs | 3 + datasketches/src/theta/serialization.rs | 636 +++++++++++++++++++++ datasketches/src/theta/sketch.rs | 305 ++++++++++ 8 files changed, 1158 insertions(+), 10 deletions(-) create mode 100644 datasketches/src/common/seed_hash.rs create mode 100644 datasketches/src/theta/implementation.md create mode 100644 datasketches/src/theta/serialization.rs diff --git a/datasketches/src/common/mod.rs b/datasketches/src/common/mod.rs index 017ddc9..6c01351 100644 --- a/datasketches/src/common/mod.rs +++ b/datasketches/src/common/mod.rs @@ -20,8 +20,10 @@ // public common components for datasketches crate mod num_std_dev; mod resize; +mod seed_hash; pub use self::num_std_dev::NumStdDev; pub use self::resize::ResizeFactor; // private to datasketches crate pub(crate) mod binomial_bounds; +pub(crate) use self::seed_hash::compute_seed_hash; diff --git a/datasketches/src/common/seed_hash.rs b/datasketches/src/common/seed_hash.rs new file mode 100644 index 0000000..198af78 --- /dev/null +++ b/datasketches/src/common/seed_hash.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hash::Hasher; + +use crate::hash::MurmurHash3X64128; + +pub(crate) fn compute_seed_hash(seed: u64) -> u16 { + let mut hasher = MurmurHash3X64128::with_seed(0); + hasher.write(&seed.to_le_bytes()); + let (h1, _) = hasher.finish128(); + (h1 & 0xffff) as u16 +} + diff --git a/datasketches/src/countmin/serialization.rs b/datasketches/src/countmin/serialization.rs index bb6517d..66d8881 100644 --- a/datasketches/src/countmin/serialization.rs +++ b/datasketches/src/countmin/serialization.rs @@ -15,19 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::hash::Hasher; - -use crate::hash::MurmurHash3X64128; - pub(super) const PREAMBLE_LONGS_SHORT: u8 = 2; pub(super) const SERIAL_VERSION: u8 = 1; pub(super) const COUNTMIN_FAMILY_ID: u8 = 18; pub(super) const FLAGS_IS_EMPTY: u8 = 1 << 0; pub(super) const LONG_SIZE_BYTES: usize = 8; -pub(super) fn compute_seed_hash(seed: u64) -> u16 { - let mut hasher = MurmurHash3X64128::with_seed(0); - hasher.write(&seed.to_le_bytes()); - let (h1, _) = hasher.finish128(); - (h1 & 0xffff) as u16 -} +pub(super) use crate::common::compute_seed_hash; diff --git a/datasketches/src/theta/hash_table.rs b/datasketches/src/theta/hash_table.rs index ecba0fa..b05ac6c 100644 --- a/datasketches/src/theta/hash_table.rs +++ b/datasketches/src/theta/hash_table.rs @@ -296,6 +296,10 @@ impl ThetaHashTable { self.lg_nom_size } + pub(crate) fn hash_seed(&self) -> u64 { + self.hash_seed + } + /// Get stride for hash table probing fn get_stride(key: u64, lg_size: u8) -> usize { (2 * ((key >> (lg_size)) & STRIDE_MASK) + 1) as usize diff --git a/datasketches/src/theta/implementation.md b/datasketches/src/theta/implementation.md new file mode 100644 index 0000000..652b7ab --- /dev/null +++ b/datasketches/src/theta/implementation.md @@ -0,0 +1,179 @@ +# Compact Theta Sketch (Implementation Notes) + +This document describes how the Rust implementation should represent and interoperate with the +Apache DataSketches **Compact Theta Sketch** formats used by the Java and C++ libraries. + +The intent is cross-language compatibility: + +- **On-heap representation**: a minimal immutable form of a Theta sketch. +- **Binary format**: compatible serialization/deserialization (uncompressed `serVer = 3`), matching + the preamble layout and flags used by `datasketches-java` and `datasketches-cpp`. + +--- + +## compact theta sketch + +A compact theta sketch is the immutable, serialized-friendly form of a theta sketch: + +- It stores a **compact array** of retained hash values (no interstitial zeros like the update + sketch’s hash table). +- It stores the **theta** threshold (`thetaLong`) and a **seed hash** (16-bit). +- It can be **ordered** (sorted ascending by hash) or **unordered**. +- It is **read-only** (cannot be updated), but is intended to participate in set operations. + +### Hash invariants (cross-language) + +Java/C++ (and this Rust crate) treat retained hashes as: + +- 63-bit, non-negative values derived from MurmurHash3 (128-bit), taking `h1 >> 1`. +- `0` is reserved for empty slots and must not appear as a retained entry. +- Every retained entry must satisfy: `0 < hash < thetaLong`. +- `thetaLong` uses the signed max (`Long.MAX_VALUE` / `i64::MAX`) as “1.0” (no sampling). + +In Rust, `MAX_THETA` is `i64::MAX as u64`, matching Java/C++. + +### Compact-state truth table (Java/C++ behavior) + +When producing a compact sketch (or serializing), Java defines a truth table over `(empty, curCount, +thetaLong)` and applies corrections in specific cases (see `CompactOperations.correctThetaOnCompact` +and related helpers): + +- Normal empty: `empty = true`, `curCount = 0`, `thetaLong = MAX_THETA` → encoded as an 8-byte sketch. +- A sketch with `p < 1.0` but never updated may have `empty = true`, `curCount = 0`, + `thetaLong < MAX_THETA` internally; Java corrects theta back to `MAX_THETA` during compaction/ + serialization so it becomes a normal empty compact sketch. +- A compact sketch can be **non-empty flag false** while still having `curCount = 0` and + `thetaLong < MAX_THETA` as a possible result of set operations; this must serialize with + `preLongs = 3` to preserve theta. + +Rust should mirror these behaviors for cross-language parity. + +--- + +## serailzation/deserialization + +This section documents the uncompressed compact theta sketch binary format (`serVer = 3`), as used +by Java and C++. + +### Endianness + +Multi-byte integers are written in the platform’s native endianness in the Java/C++ implementations, +with a legacy “big-endian” bit in the flags byte (bit 0). In practice, modern platforms are little +endian and serialize with that bit cleared. + +For Rust cross-platform robustness: + +- **Serialize** using little-endian encodings and keep the big-endian flag bit clear. +- **Deserialize** by reading the big-endian flag bit and decoding multi-byte fields accordingly. + +### Preamble (first 8 bytes) + +All compact sketches start with a single 8-byte “preamble long” with fixed byte offsets: + +| Byte offset | Field | Notes | +|---:|---|---| +| 0 | `preLongs` (low 6 bits) | Number of 8-byte longs in the preamble (1–3 for v3 compact). | +| 1 | `serVer` | Must be `3` for uncompressed compact sketches. | +| 2 | `family` | Must be `3` (`Family.COMPACT`). | +| 3 | `lgNomLongs` | **Unused for compact**; must be written as `0`. | +| 4 | `lgArrLongs` | **Unused for compact**; must be written as `0`. | +| 5 | `flags` | Bitfield, defined below. | +| 6–7 | `seedHash` (`u16`) | Must match `computeSeedHash(expectedSeed)` (Java/C++). | + +### Flags byte (byte 5) + +Bit positions follow Java/C++: + +- Bit 0: big-endian legacy indicator (reserved in Java, still present in C++). +- Bit 1: read-only (must be set for compact sketches). +- Bit 2: empty. +- Bit 3: compact (must be set for compact sketches). +- Bit 4: ordered. +- Bit 5: single-item. +- Bits 6–7: reserved (must be zero). + +### `preLongs` and payload layout (v3) + +The total serialized size is `(preLongs + curCount) * 8` bytes, except the “empty compact” case +which is always exactly 8 bytes. + +The format varies by `(empty, curCount, thetaLong)`: + +#### 1) Empty compact sketch (8 bytes) + +- `preLongs = 1` +- `flags.empty = 1` +- No `curCount`, no `thetaLong`, no entries. +- `thetaLong` is implicitly `MAX_THETA`. + +#### 2) Single item (16 bytes) + +- `preLongs = 1` +- `flags.singleItem = 1`, `flags.ordered = 1` (Java sets ordered for single-item). +- No `curCount`, no `thetaLong`. +- Payload: one 8-byte hash at offset `8`. + +#### 3) Exact compact sketch (non-estimating) + +- `thetaLong == MAX_THETA` +- `preLongs = 2` for `curCount > 1`; (for `curCount == 1`, Java uses the single-item form above). +- Long at offset `8` contains: + - `curCount` as a 4-byte int at offsets `8..12` + - `p` as a 4-byte float at offsets `12..16` (**not used**; Java writes `0.0` to match C++). +- Payload: `curCount` hashes starting at offset `preLongs * 8` (i.e. 16). + +#### 4) Estimating compact sketch + +- `thetaLong < MAX_THETA` +- `preLongs = 3` +- Long at offset `8` contains: + - `curCount` as a 4-byte int at offsets `8..12` + - `p` as a 4-byte float at offsets `12..16` (**not used**; Java writes `0.0`). +- Long at offset `16` contains: + - `thetaLong` as an 8-byte long at offsets `16..24`. +- Payload: `curCount` hashes starting at offset `preLongs * 8` (i.e. 24). + +### Serialization algorithm (v3, conceptual) + +1. Determine `(empty, curCount, thetaLong, ordered)` from the compact sketch state. +2. Apply the “empty + never-updated sampled sketch” correction: + if `empty && curCount == 0`, serialize as an empty compact with `thetaLong = MAX_THETA`. +3. Compute `preLongs`: + - if `thetaLong < MAX_THETA` → `preLongs = 3` + - else if `empty` → `preLongs = 1` + - else if `curCount == 1` → `preLongs = 1` (single item) + - else → `preLongs = 2` +4. Write preamble fields; ensure `lgNomLongs = 0`, `lgArrLongs = 0`, and set flags: + `readOnly=1`, `compact=1`, plus `empty/ordered/singleItem` as applicable. +5. For `preLongs >= 2`, write `curCount` and `p = 0.0f`. +6. For `preLongs == 3`, write `thetaLong`. +7. Write the `curCount` retained hashes, ordered if requested. + +### Deserialization requirements (v3) + +When decoding bytes into a compact theta sketch: + +- Validate `family == 3` and `serVer == 3`. +- Validate flags: + - `compact` must be set + - `readOnly` must be set + - reserved bits must be zero (or tolerated for legacy inputs, depending on strictness) +- Validate `seedHash` matches the expected seed. +- If `empty` flag is set: + - require `preLongs == 1` and total size == 8 + - return the empty compact sketch (`thetaLong = MAX_THETA`, `entries = []`) +- Else if `singleItem`: + - require `preLongs == 1` and total size == 16 + - read one hash at offset 8 +- Else: + - read `curCount` (u32) at offset 8 + - if `preLongs == 3`, read `thetaLong` at offset 16; else `thetaLong = MAX_THETA` + - read `curCount` hashes from offset `preLongs * 8` + - optionally validate ordering if `ordered` flag is set + +### Note on compressed format (`serVer = 4`) + +Java/C++ also support a compressed, delta-encoded ordered compact sketch (`serVer = 4`), with a +different layout (variable-length retained entries count and packed deltas). This is not required +for basic cross-language interoperability, but can be added later for reduced serialized sizes. + diff --git a/datasketches/src/theta/mod.rs b/datasketches/src/theta/mod.rs index ccaac52..faf68ca 100644 --- a/datasketches/src/theta/mod.rs +++ b/datasketches/src/theta/mod.rs @@ -28,6 +28,7 @@ //! configurable accuracy and memory usage. The implementation supports: //! //! - **ThetaSketch**: Mutable sketch for building from input data +//! - **CompactThetaSketch**: Immutable sketch for serialization and set operations //! //! # Usage //! @@ -40,6 +41,8 @@ mod hash_table; mod sketch; +mod serialization; +pub use self::sketch::CompactThetaSketch; pub use self::sketch::ThetaSketch; pub use self::sketch::ThetaSketchBuilder; diff --git a/datasketches/src/theta/serialization.rs b/datasketches/src/theta/serialization.rs new file mode 100644 index 0000000..bcbc94b --- /dev/null +++ b/datasketches/src/theta/serialization.rs @@ -0,0 +1,636 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary serialization format constants and helpers for Theta sketches. + +use crate::codec::SketchBytes; +use crate::codec::SketchSlice; +use crate::common::compute_seed_hash; +use crate::error::Error; +use crate::hash::DEFAULT_UPDATE_SEED; +use crate::theta::hash_table::MAX_THETA; + +use super::sketch::CompactThetaSketch; + +pub(crate) const FAMILY_COMPACT: u8 = 3; +pub(crate) const SERIAL_VERSION_V3: u8 = 3; +pub(crate) const SERIAL_VERSION_V4: u8 = 4; + +pub(crate) const FLAGS_IS_BIG_ENDIAN: u8 = 1 << 0; +pub(crate) const FLAGS_IS_READ_ONLY: u8 = 1 << 1; +pub(crate) const FLAGS_IS_EMPTY: u8 = 1 << 2; +pub(crate) const FLAGS_IS_COMPACT: u8 = 1 << 3; +pub(crate) const FLAGS_IS_ORDERED: u8 = 1 << 4; +pub(crate) const FLAGS_IS_SINGLE_ITEM: u8 = 1 << 5; +pub(crate) const FLAGS_RESERVED_MASK: u8 = 0b1100_0000; + +pub(crate) fn serialize_v3(sketch: &CompactThetaSketch) -> Vec { + let mut entries = sketch.entries.clone(); + let mut theta = sketch.theta; + let empty = sketch.empty || (entries.is_empty() && theta == MAX_THETA); + if empty && entries.is_empty() { + // Java/C++ correctThetaOnCompact(empty && curCount==0) + theta = MAX_THETA; + } + + let single_item = !empty && entries.len() == 1 && theta == MAX_THETA; + let ordered = sketch.ordered || single_item; + if ordered && entries.len() > 1 { + entries.sort_unstable(); + } + + let pre_longs = if theta < MAX_THETA { + 3 + } else if empty { + 1 + } else if entries.len() == 1 { + 1 + } else { + 2 + }; + + let mut flags = 0u8; + flags |= FLAGS_IS_READ_ONLY; + flags |= FLAGS_IS_COMPACT; + if empty { + flags |= FLAGS_IS_EMPTY; + } + if ordered { + flags |= FLAGS_IS_ORDERED; + } + if single_item { + flags |= FLAGS_IS_SINGLE_ITEM; + } + + let out_longs = pre_longs as usize + entries.len(); + let mut bytes = SketchBytes::with_capacity(out_longs * 8); + bytes.write_u8(pre_longs & 0x3f); // upper 2 bits unused for compact sketches + bytes.write_u8(SERIAL_VERSION_V3); + bytes.write_u8(FAMILY_COMPACT); + bytes.write_u8(0); // lgNomLongs unused for compact + bytes.write_u8(0); // lgArrLongs unused for compact + bytes.write_u8(flags & !FLAGS_IS_BIG_ENDIAN); // always serialize as little-endian + bytes.write_u16_le(sketch.seed_hash); + + if pre_longs == 1 { + if entries.len() == 1 && !empty { + bytes.write_u64_le(entries[0]); + } + return bytes.into_bytes(); + } + + bytes.write_u32_le(entries.len() as u32); + bytes.write_f32_le(0.0); // not used by compact sketches; match Java/C++ + if pre_longs == 3 { + bytes.write_u64_le(theta); + } + for hash in entries.iter().copied() { + bytes.write_u64_le(hash); + } + bytes.into_bytes() +} + +pub(crate) fn serialize_compressed(sketch: &CompactThetaSketch) -> Vec { + if is_suitable_for_compression(sketch) { + serialize_v4(sketch) + } else { + serialize_v3(sketch) + } +} + +pub(crate) fn deserialize(bytes: &[u8]) -> Result { + deserialize_with_seed(bytes, DEFAULT_UPDATE_SEED) +} + +pub(crate) fn deserialize_with_seed(bytes: &[u8], expected_seed: u64) -> Result { + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } + + let mut cursor = SketchSlice::new(bytes); + let pre0 = cursor.read_u8().map_err(make_error("preamble_longs"))?; + let pre_longs = pre0 & 0x3f; + let ser_ver = cursor.read_u8().map_err(make_error("serial_version"))?; + let family = cursor.read_u8().map_err(make_error("family_id"))?; + + if family != FAMILY_COMPACT { + return Err(Error::invalid_family(FAMILY_COMPACT, family, "CompactThetaSketch")); + } + + match ser_ver { + SERIAL_VERSION_V3 => deserialize_v3(bytes, pre_longs, cursor, expected_seed), + SERIAL_VERSION_V4 => deserialize_v4(bytes, pre_longs, cursor, expected_seed), + _ => Err(Error::deserial(format!( + "unsupported serial version: expected 3 or 4, got {ser_ver}", + ))), + } +} + +fn deserialize_v3( + bytes: &[u8], + pre_longs: u8, + mut cursor: SketchSlice<'_>, + expected_seed: u64, +) -> Result { + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } + + cursor.read_u8().map_err(make_error("lg_nom_longs"))?; + cursor.read_u8().map_err(make_error("lg_arr_longs"))?; + let flags = cursor.read_u8().map_err(make_error("flags"))?; + + let big_endian = (flags & FLAGS_IS_BIG_ENDIAN) != 0; + let read_only = (flags & FLAGS_IS_READ_ONLY) != 0; + let compact = (flags & FLAGS_IS_COMPACT) != 0; + if !read_only || !compact { + return Err(Error::deserial( + "corrupted: compact sketches must have read-only and compact flags set", + )); + } + if (flags & FLAGS_RESERVED_MASK) != 0 { + return Err(Error::deserial("corrupted: reserved flag bits must be zero")); + } + + let empty_flag = (flags & FLAGS_IS_EMPTY) != 0; + let ordered = (flags & FLAGS_IS_ORDERED) != 0; + let single_item = (flags & FLAGS_IS_SINGLE_ITEM) != 0; + + let seed_hash = if big_endian { + cursor.read_u16_be().map_err(make_error("seed_hash"))? + } else { + cursor.read_u16_le().map_err(make_error("seed_hash"))? + }; + + if empty_flag { + if pre_longs != 1 { + return Err(Error::invalid_preamble_longs(1, pre_longs)); + } + if bytes.len() != 8 { + return Err(Error::deserial(format!( + "invalid empty compact theta sketch size: expected 8, got {}", + bytes.len() + ))); + } + return Ok(CompactThetaSketch { + entries: vec![], + theta: MAX_THETA, + seed_hash, + ordered, + empty: true, + }); + } + + let expected_seed_hash = compute_seed_hash(expected_seed); + if seed_hash != expected_seed_hash { + return Err(Error::deserial(format!( + "incompatible seed hash: expected {expected_seed_hash}, got {seed_hash}", + ))); + } + + if single_item { + if pre_longs != 1 { + return Err(Error::invalid_preamble_longs(1, pre_longs)); + } + if bytes.len() != 16 { + return Err(Error::deserial(format!( + "invalid single-item compact theta sketch size: expected 16, got {}", + bytes.len() + ))); + } + let hash = if big_endian { + cursor.read_u64_be().map_err(make_error("single_hash"))? + } else { + cursor.read_u64_le().map_err(make_error("single_hash"))? + }; + if hash == 0 || hash >= MAX_THETA { + return Err(Error::deserial("corrupted: invalid retained hash value")); + } + return Ok(CompactThetaSketch { + entries: vec![hash], + theta: MAX_THETA, + seed_hash, + ordered: true, // single-item sketches are ordered in Java/C++ + empty: false, + }); + } + + if pre_longs != 2 && pre_longs != 3 { + return Err(Error::deserial(format!( + "invalid compact theta preamble_longs: expected 2 or 3, got {pre_longs}", + ))); + } + + let cur_count = if big_endian { + cursor.read_u32_be().map_err(make_error("cur_count"))? + } else { + cursor.read_u32_le().map_err(make_error("cur_count"))? + } as usize; + if big_endian { + cursor.read_f32_be().map_err(make_error("p_float"))?; + } else { + cursor.read_f32_le().map_err(make_error("p_float"))?; + } + + let theta = if pre_longs == 3 { + if big_endian { + cursor.read_u64_be().map_err(make_error("theta_long"))? + } else { + cursor.read_u64_le().map_err(make_error("theta_long"))? + } + } else { + MAX_THETA + }; + + let mut entries = Vec::with_capacity(cur_count); + for _ in 0..cur_count { + let hash = if big_endian { + cursor.read_u64_be().map_err(make_error("entries"))? + } else { + cursor.read_u64_le().map_err(make_error("entries"))? + }; + if hash == 0 || hash >= theta { + return Err(Error::deserial("corrupted: invalid retained hash value")); + } + entries.push(hash); + } + + if ordered && entries.len() > 1 { + for i in 1..entries.len() { + if entries[i] <= entries[i - 1] { + return Err(Error::deserial( + "corrupted: ordered compact sketch entries must be strictly increasing", + )); + } + } + } + + let empty = cur_count == 0 && theta == MAX_THETA; + + Ok(CompactThetaSketch { + entries, + theta, + seed_hash, + ordered, + empty, + }) +} + +fn deserialize_v4( + _bytes: &[u8], + pre_longs: u8, + mut cursor: SketchSlice<'_>, + expected_seed: u64, +) -> Result { + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } + + if pre_longs != 1 && pre_longs != 2 { + return Err(Error::deserial(format!( + "invalid compact theta preamble_longs for v4: expected 1 or 2, got {pre_longs}", + ))); + } + + let entry_bits = cursor.read_u8().map_err(make_error("entry_bits"))?; + let num_entries_bytes = cursor + .read_u8() + .map_err(make_error("num_entries_bytes"))?; + let flags = cursor.read_u8().map_err(make_error("flags"))?; + + let big_endian = (flags & FLAGS_IS_BIG_ENDIAN) != 0; + let read_only = (flags & FLAGS_IS_READ_ONLY) != 0; + let compact = (flags & FLAGS_IS_COMPACT) != 0; + let ordered = (flags & FLAGS_IS_ORDERED) != 0; + if !read_only || !compact || !ordered { + return Err(Error::deserial( + "corrupted: v4 compact sketches must be read-only, compact, and ordered", + )); + } + if (flags & FLAGS_RESERVED_MASK) != 0 { + return Err(Error::deserial("corrupted: reserved flag bits must be zero")); + } + + let empty_flag = (flags & FLAGS_IS_EMPTY) != 0; + + let seed_hash = if big_endian { + cursor.read_u16_be().map_err(make_error("seed_hash"))? + } else { + cursor.read_u16_le().map_err(make_error("seed_hash"))? + }; + + let theta = if pre_longs == 2 { + if big_endian { + cursor.read_u64_be().map_err(make_error("theta_long"))? + } else { + cursor.read_u64_le().map_err(make_error("theta_long"))? + } + } else { + MAX_THETA + }; + + if empty_flag { + return Ok(CompactThetaSketch { + entries: vec![], + theta, + seed_hash, + ordered: true, + empty: theta == MAX_THETA, + }); + } + + let expected_seed_hash = compute_seed_hash(expected_seed); + if seed_hash != expected_seed_hash { + return Err(Error::deserial(format!( + "incompatible seed hash: expected {expected_seed_hash}, got {seed_hash}", + ))); + } + + if num_entries_bytes == 0 || num_entries_bytes > 4 { + return Err(Error::deserial(format!( + "corrupted: invalid num_entries_bytes: expected 1..=4, got {num_entries_bytes}", + ))); + } + + let mut num_entries: u32 = 0; + if big_endian { + for _ in 0..num_entries_bytes { + let b = cursor.read_u8().map_err(make_error("num_entries"))? as u32; + num_entries = (num_entries << 8) | b; + } + } else { + for i in 0..num_entries_bytes { + let b = cursor.read_u8().map_err(make_error("num_entries"))? as u32; + num_entries |= b << (i * 8); + } + } + + if num_entries == 0 { + return Ok(CompactThetaSketch { + entries: vec![], + theta, + seed_hash, + ordered: true, + empty: theta == MAX_THETA, + }); + } + + if entry_bits == 0 || entry_bits > 64 { + return Err(Error::deserial(format!( + "corrupted: invalid entry_bits: expected 1..=64, got {entry_bits}", + ))); + } + + let num_entries_usize = num_entries as usize; + let mut deltas = vec![0u64; num_entries_usize]; + + // unpack blocks of 8 deltas + let mut i = 0usize; + while i + 7 < num_entries_usize { + let mut block = vec![0u8; entry_bits as usize]; + cursor.read_exact(&mut block).map_err(make_error("delta_block8"))?; + unpack_block8(&mut deltas[i..i + 8], entry_bits, &block); + i += 8; + } + + // unpack remainder + if i < num_entries_usize { + let rem = num_entries_usize - i; + let bytes_needed = whole_bytes_to_hold_bits(rem * entry_bits as usize); + let mut tail = vec![0u8; bytes_needed]; + cursor.read_exact(&mut tail).map_err(make_error("delta_tail"))?; + unpack_tail(&mut deltas[i..], entry_bits, &tail); + } + + // undo deltas + let mut entries = vec![0u64; num_entries_usize]; + let mut previous = 0u64; + for (dst, delta) in entries.iter_mut().zip(deltas.into_iter()) { + *dst = previous + delta; + previous = *dst; + } + + Ok(CompactThetaSketch { + entries, + theta, + seed_hash, + ordered: true, + empty: false, + }) +} + +fn serialize_v4(sketch: &CompactThetaSketch) -> Vec { + // v4 requires ordered, non-empty, and (unless estimating) not a single item. + let mut entries = sketch.entries.clone(); + if entries.len() > 1 { + entries.sort_unstable(); + } + + let is_estimation_mode = sketch.theta < MAX_THETA; + let pre_longs = if is_estimation_mode { 2 } else { 1 }; + let entry_bits = std::cmp::max(compute_entry_bits(&entries), 1); + let num_entries_bytes = num_entries_bytes(entries.len()); + + // Pre-size exactly like C++: preamble longs (8 bytes each) + num_entries_bytes + packed bits. + let compressed_bits = entry_bits as usize * entries.len(); + let compressed_bytes = whole_bytes_to_hold_bits(compressed_bits); + let out_bytes = (pre_longs as usize * 8) + (num_entries_bytes as usize) + compressed_bytes; + let mut bytes = SketchBytes::with_capacity(out_bytes); + + bytes.write_u8(pre_longs); + bytes.write_u8(SERIAL_VERSION_V4); + bytes.write_u8(FAMILY_COMPACT); + bytes.write_u8(entry_bits); + bytes.write_u8(num_entries_bytes); + + let mut flags = 0u8; + flags |= FLAGS_IS_READ_ONLY; + flags |= FLAGS_IS_COMPACT; + flags |= FLAGS_IS_ORDERED; + bytes.write_u8(flags & !FLAGS_IS_BIG_ENDIAN); + bytes.write_u16_le(sketch.seed_hash); + + if is_estimation_mode { + bytes.write_u64_le(sketch.theta); + } + + // num_entries stored little-endian with num_entries_bytes bytes + let mut n = entries.len() as u32; + for _ in 0..num_entries_bytes { + bytes.write_u8((n & 0xff) as u8); + n >>= 8; + } + + // pack deltas + let mut previous = 0u64; + let mut i = 0usize; + let mut block = vec![0u8; entry_bits as usize]; + + while i + 7 < entries.len() { + let mut deltas = [0u64; 8]; + for j in 0..8 { + let entry = entries[i + j]; + deltas[j] = entry - previous; + previous = entry; + } + block.fill(0); + pack_block8(&deltas, entry_bits, &mut block); + bytes.write(&block); + i += 8; + } + + if i < entries.len() { + let rem = entries.len() - i; + let bytes_needed = whole_bytes_to_hold_bits(rem * entry_bits as usize); + let mut tail = vec![0u8; bytes_needed]; + pack_tail(&entries[i..], entry_bits, &mut previous, &mut tail); + bytes.write(&tail); + } + + bytes.into_bytes() +} + +fn is_suitable_for_compression(sketch: &CompactThetaSketch) -> bool { + if !sketch.ordered { + return false; + } + let n = sketch.entries.len(); + if n == 0 { + return false; + } + if n == 1 && sketch.theta == MAX_THETA { + return false; + } + true +} + +fn compute_entry_bits(entries: &[u64]) -> u8 { + let mut previous = 0u64; + let mut ored = 0u64; + for &entry in entries { + let delta = entry - previous; + ored |= delta; + previous = entry; + } + (64 - ored.leading_zeros()) as u8 +} + +fn num_entries_bytes(num_entries: usize) -> u8 { + let n = num_entries as u32; + let bits = 32 - n.leading_zeros(); + whole_bytes_to_hold_bits(bits as usize) as u8 +} + +fn whole_bytes_to_hold_bits(bits: usize) -> usize { + (bits + 7) / 8 +} + +fn pack_bits(value: u64, mut bits: u8, out: &mut [u8], mut index: usize, offset: u8) -> (usize, u8) { + if offset > 0 { + let chunk_bits = 8 - offset; + let mask = ((1u16 << chunk_bits) - 1) as u8; + if bits < chunk_bits { + out[index] |= ((value << (chunk_bits - bits)) as u8) & mask; + return (index, offset + bits); + } + out[index] |= ((value >> (bits - chunk_bits)) as u8) & mask; + index += 1; + bits -= chunk_bits; + } + + while bits >= 8 { + out[index] = (value >> (bits - 8)) as u8; + index += 1; + bits -= 8; + } + if bits > 0 { + out[index] = (value << (8 - bits)) as u8; + return (index, bits); + } + (index, 0) +} + +fn unpack_bits(mut bits: u8, input: &[u8], mut index: usize, mut offset: u8) -> (u64, usize, u8) { + let avail_bits = 8 - offset; + let chunk_bits = std::cmp::min(avail_bits, bits); + let mask = ((1u16 << chunk_bits) - 1) as u8; + + let mut value = ((input[index] >> (avail_bits - chunk_bits)) & mask) as u64; + if avail_bits == chunk_bits { + index += 1; + } + offset = (offset + chunk_bits) & 7; + bits -= chunk_bits; + + while bits >= 8 { + value = (value << 8) | (input[index] as u64); + index += 1; + bits -= 8; + } + if bits > 0 { + value <<= bits; + value |= (input[index] >> (8 - bits)) as u64; + return (value, index, bits); + } + (value, index, offset) +} + +fn pack_block8(deltas: &[u64; 8], entry_bits: u8, out: &mut [u8]) { + let mut index = 0usize; + let mut offset = 0u8; + for &delta in deltas { + (index, offset) = pack_bits(delta, entry_bits, out, index, offset); + } + debug_assert_eq!(index, entry_bits as usize); + debug_assert_eq!(offset, 0); +} + +fn unpack_block8(out_deltas: &mut [u64], entry_bits: u8, input: &[u8]) { + let mut index = 0usize; + let mut offset = 0u8; + for slot in out_deltas.iter_mut() { + let (value, new_index, new_offset) = unpack_bits(entry_bits, input, index, offset); + *slot = value; + index = new_index; + offset = new_offset; + } +} + +fn pack_tail(entries: &[u64], entry_bits: u8, previous: &mut u64, out: &mut [u8]) { + let mut index = 0usize; + let mut offset = 0u8; + for &entry in entries { + let delta = entry - *previous; + *previous = entry; + (index, offset) = pack_bits(delta, entry_bits, out, index, offset); + } + if offset > 0 { + index += 1; + } + debug_assert_eq!(index, out.len()); +} + +fn unpack_tail(out_deltas: &mut [u64], entry_bits: u8, input: &[u8]) { + let mut index = 0usize; + let mut offset = 0u8; + for slot in out_deltas.iter_mut() { + let (value, new_index, new_offset) = unpack_bits(entry_bits, input, index, offset); + *slot = value; + index = new_index; + offset = new_offset; + } +} diff --git a/datasketches/src/theta/sketch.rs b/datasketches/src/theta/sketch.rs index 506b2b6..50f2ce6 100644 --- a/datasketches/src/theta/sketch.rs +++ b/datasketches/src/theta/sketch.rs @@ -25,12 +25,14 @@ use std::hash::Hash; use crate::common::NumStdDev; use crate::common::ResizeFactor; use crate::common::binomial_bounds; +use crate::common::compute_seed_hash; use crate::hash::DEFAULT_UPDATE_SEED; use crate::theta::hash_table::DEFAULT_LG_K; use crate::theta::hash_table::MAX_LG_K; use crate::theta::hash_table::MAX_THETA; use crate::theta::hash_table::MIN_LG_K; use crate::theta::hash_table::ThetaHashTable; +use crate::theta::serialization; /// Mutable theta sketch for building from input data #[derive(Debug)] @@ -173,6 +175,43 @@ impl ThetaSketch { self.table.iter() } + /// Return this sketch in compact (immutable) form. + /// + /// If `ordered` is true, retained hash values are sorted in ascending order. + /// + /// # Examples + /// + /// ``` + /// # use datasketches::theta::ThetaSketch; + /// let mut sketch = ThetaSketch::builder().build(); + /// sketch.update("apple"); + /// let compact = sketch.compact(true); + /// assert_eq!(compact.num_retained(), 1); + /// ``` + pub fn compact(&self, ordered: bool) -> CompactThetaSketch { + let mut entries: Vec = self.iter().collect(); + if ordered && entries.len() > 1 { + entries.sort_unstable(); + } + + let theta = if entries.is_empty() { + // Match Java's correctThetaOnCompact() behavior for never-updated sketches + // initialized with p < 1.0. + MAX_THETA + } else { + self.table.theta() + }; + let empty = entries.is_empty(); + + CompactThetaSketch { + entries, + theta, + seed_hash: compute_seed_hash(self.table.hash_seed()), + ordered, + empty, + } + } + /// Returns the approximate lower error bound given the specified number of Standard Deviations. /// /// # Arguments @@ -247,6 +286,123 @@ impl ThetaSketch { } } +/// Compact (immutable) theta sketch. +/// +/// This is the serialized-friendly form of a theta sketch: a compact array of retained hash values +/// plus theta and a 16-bit seed hash. It can be ordered (sorted ascending) or unordered. +#[derive(Clone, Debug)] +pub struct CompactThetaSketch { + pub(crate) entries: Vec, + pub(crate) theta: u64, + pub(crate) seed_hash: u16, + pub(crate) ordered: bool, + pub(crate) empty: bool, +} + +impl CompactThetaSketch { + /// Returns the cardinality estimate. + pub fn estimate(&self) -> f64 { + if self.is_empty() { + return 0.0; + } + let num_retained = self.num_retained() as f64; + if self.theta == MAX_THETA { + return num_retained; + } + let theta = self.theta as f64 / MAX_THETA as f64; + num_retained / theta + } + + /// Returns theta as a fraction (0.0 to 1.0). + pub fn theta(&self) -> f64 { + self.theta as f64 / MAX_THETA as f64 + } + + /// Returns theta as u64. + pub fn theta64(&self) -> u64 { + self.theta + } + + /// Returns true if this sketch is empty. + pub fn is_empty(&self) -> bool { + self.empty + } + + /// Returns true if this sketch is in estimation mode. + pub fn is_estimation_mode(&self) -> bool { + self.theta < MAX_THETA + } + + /// Returns the number of retained entries. + pub fn num_retained(&self) -> usize { + self.entries.len() + } + + /// Returns true if retained entries are ordered (sorted ascending). + pub fn is_ordered(&self) -> bool { + self.ordered + } + + /// Returns the 16-bit seed hash. + pub fn seed_hash(&self) -> u16 { + self.seed_hash + } + + /// Return iterator over retained hash values. + pub fn iter(&self) -> impl Iterator + '_ { + self.entries.iter().copied() + } + + /// Returns the approximate lower error bound given the specified number of Standard Deviations. + pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 { + if !self.is_estimation_mode() { + return self.num_retained() as f64; + } + binomial_bounds::lower_bound(self.num_retained() as u64, self.theta(), num_std_dev) + .expect("theta should always be valid") + } + + /// Returns the approximate upper error bound given the specified number of Standard Deviations. + pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 { + if !self.is_estimation_mode() { + return self.num_retained() as f64; + } + binomial_bounds::upper_bound( + self.num_retained() as u64, + self.theta(), + num_std_dev, + self.is_empty(), + ) + .expect("theta should always be valid") + } + + /// Serializes this sketch into the uncompressed (`serVer = 3`) compact theta format. + pub fn serialize(&self) -> Vec { + serialization::serialize_v3(self) + } + + /// Deserializes a compact theta sketch from bytes using the default seed. + pub fn deserialize(bytes: &[u8]) -> Result { + serialization::deserialize(bytes) + } + + /// Deserializes a compact theta sketch from bytes using the provided expected seed. + pub fn deserialize_with_seed( + bytes: &[u8], + expected_seed: u64, + ) -> Result { + serialization::deserialize_with_seed(bytes, expected_seed) + } + + /// Serializes this sketch in compressed form if applicable. + /// + /// This uses `serVer = 4` when the sketch is ordered and suitable for compression, and falls + /// back to uncompressed `serVer = 3` otherwise. + pub fn serialize_compressed(&self) -> Vec { + serialization::serialize_compressed(self) + } +} + /// Builder for ThetaSketch #[derive(Debug)] pub struct ThetaSketchBuilder { @@ -369,3 +525,152 @@ fn canonical_double(value: f64) -> i64 { (value + 0.0).to_bits() as i64 } } + +#[cfg(test)] +mod compact_tests { + use super::*; + + fn to_big_endian_v3(mut bytes: Vec) -> Vec { + // Set big-endian flag bit + bytes[5] |= crate::theta::serialization::FLAGS_IS_BIG_ENDIAN; + let pre_longs = bytes[0] & 0x3f; + + // seed hash (bytes 6..8) + bytes[6..8].reverse(); + + match pre_longs { + 1 => { + if bytes.len() == 16 { + bytes[8..16].reverse(); + } + } + 2 => { + bytes[8..12].reverse(); // curCount + bytes[12..16].reverse(); // p float + for chunk in bytes[16..].chunks_exact_mut(8) { + chunk.reverse(); + } + } + 3 => { + bytes[8..12].reverse(); // curCount + bytes[12..16].reverse(); // p float + bytes[16..24].reverse(); // theta + for chunk in bytes[24..].chunks_exact_mut(8) { + chunk.reverse(); + } + } + _ => {} + } + + bytes + } + + #[test] + fn compact_empty_sampling_corrects_theta_on_serialize() { + let sketch = ThetaSketch::builder().sampling_probability(0.5).build(); + let compact = sketch.compact(true); + assert!(compact.is_empty()); + assert_eq!(compact.theta64(), MAX_THETA); + + let bytes = compact.serialize(); + assert_eq!(bytes.len(), 8); + assert_eq!(bytes[0] & 0x3f, 1); + assert!((bytes[5] & crate::theta::serialization::FLAGS_IS_EMPTY) != 0); + assert!((bytes[5] & crate::theta::serialization::FLAGS_IS_COMPACT) != 0); + assert!((bytes[5] & crate::theta::serialization::FLAGS_IS_READ_ONLY) != 0); + + let decoded = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert!(decoded.is_empty()); + assert_eq!(decoded.theta64(), MAX_THETA); + assert_eq!(decoded.num_retained(), 0); + } + + #[test] + fn compact_single_item_round_trip() { + let mut sketch = ThetaSketch::builder().build(); + sketch.update("apple"); + let compact = sketch.compact(true); + assert!(!compact.is_empty()); + assert_eq!(compact.num_retained(), 1); + assert_eq!(compact.theta64(), MAX_THETA); + + let bytes = compact.serialize(); + assert_eq!(bytes.len(), 16); + assert_eq!(bytes[0] & 0x3f, 1); + assert!((bytes[5] & crate::theta::serialization::FLAGS_IS_SINGLE_ITEM) != 0); + + let decoded = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert!(!decoded.is_empty()); + assert_eq!(decoded.num_retained(), 1); + assert!(decoded.is_ordered()); + assert_eq!(decoded.theta64(), MAX_THETA); + assert_eq!(decoded.estimate(), 1.0); + } + + #[test] + fn compact_estimation_round_trip_ordered() { + let mut sketch = ThetaSketch::builder().lg_k(5).build(); + for i in 0..200 { + sketch.update(i); + } + assert!(sketch.is_estimation_mode()); + + let compact = sketch.compact(true); + assert!(compact.is_estimation_mode()); + assert!(compact.is_ordered()); + + let bytes = compact.serialize(); + assert_eq!(bytes[0] & 0x3f, 3); + + let decoded = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert!(decoded.is_estimation_mode()); + assert!(decoded.is_ordered()); + assert_eq!(decoded.num_retained(), compact.num_retained()); + assert_eq!(decoded.theta64(), compact.theta64()); + } + + #[test] + fn deserialize_big_endian_v3() { + let mut sketch = ThetaSketch::builder().lg_k(5).build(); + for i in 0..200 { + sketch.update(i); + } + let compact = sketch.compact(true); + let bytes_le = compact.serialize(); + let bytes_be = to_big_endian_v3(bytes_le); + + let decoded = CompactThetaSketch::deserialize(&bytes_be).unwrap(); + assert_eq!(decoded.num_retained(), compact.num_retained()); + assert_eq!(decoded.theta64(), compact.theta64()); + } + + #[test] + fn deserialize_rejects_seed_hash_mismatch() { + let mut sketch = ThetaSketch::builder().seed(7).build(); + sketch.update("apple"); + let bytes = sketch.compact(true).serialize(); + + let err = + CompactThetaSketch::deserialize_with_seed(&bytes, DEFAULT_UPDATE_SEED).unwrap_err(); + assert_eq!(err.kind(), crate::error::ErrorKind::InvalidData); + assert!(err.message().contains("incompatible seed hash")); + } + + #[test] + fn compact_serialize_compressed_uses_v4_when_suitable() { + let mut sketch = ThetaSketch::builder().lg_k(10).build(); + for i in 0..1000 { + sketch.update(i); + } + let compact = sketch.compact(true); + assert!(compact.is_ordered()); + + let bytes = compact.serialize_compressed(); + assert_eq!(bytes[1], crate::theta::serialization::SERIAL_VERSION_V4); + + let decoded = CompactThetaSketch::deserialize(&bytes).unwrap(); + assert_eq!(decoded.num_retained(), compact.num_retained()); + assert_eq!(decoded.theta64(), compact.theta64()); + assert!(decoded.is_ordered()); + } +}