Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All significant changes to this project will be documented in this file.

* `CountMinSketch` with unsigned values now supports `halve` and `decay` operations.
* `CpcSketch` and `CpcUnion` are now available for cardinality estimation.
* `FrequentItemsSketch` now supports serde for `u64` value.

## v0.2.0 (2026-01-14)

Expand Down
87 changes: 43 additions & 44 deletions datasketches/src/frequencies/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::hash::Hash;

use crate::codec::SketchBytes;
use crate::codec::SketchSlice;
use crate::error::Error;
Expand All @@ -32,66 +34,63 @@ pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4;
/// Empty flag mask (both bits for compatibility).
pub const EMPTY_FLAG_MASK: u8 = 5;

pub(crate) fn count_string_items_bytes(items: &[String]) -> usize {
items.iter().map(|item| 4 + item.len()).sum()
/// Trait for serializing and deserializing frequent item values.
pub trait FrequentItemValue: Sized + Eq + Hash + Clone {
/// Returns the size in bytes required to serialize the given item.
fn serialize_size(item: &Self) -> usize;
/// Serializes the item into the given byte buffer.
fn serialize_value(&self, bytes: &mut SketchBytes);
/// Deserializes an item from the given byte cursor.
fn deserialize_value(cursor: &mut SketchSlice<'_>) -> Result<Self, Error>;
}

pub(crate) fn serialize_string_items(bytes: &mut SketchBytes, items: &[String]) {
for item in items {
let bs = item.as_bytes();
impl FrequentItemValue for String {
fn serialize_size(item: &Self) -> usize {
size_of::<u32>() + item.len()
}

fn serialize_value(&self, bytes: &mut SketchBytes) {
let bs = self.as_bytes();
bytes.write_u32_le(bs.len() as u32);
bytes.write(bs);
}
}

pub(crate) fn deserialize_string_items(
mut cursor: SketchSlice<'_>,
num_items: usize,
) -> Result<Vec<String>, Error> {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
fn deserialize_value(cursor: &mut SketchSlice<'_>) -> Result<Self, Error> {
let len = cursor.read_u32_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} string items, failed to read len at index {i}"
))
Error::insufficient_data("failed to read string item length".to_string())
})?;

let mut slice = vec![0; len as usize];
cursor.read_exact(&mut slice).map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} string items, failed to read slice at index {i}"
))
Error::insufficient_data("failed to read string item bytes".to_string())
})?;

let value = String::from_utf8(slice)
.map_err(|_| Error::deserial(format!("invalid UTF-8 string payload at index {i}")))?;
items.push(value);
String::from_utf8(slice)
.map_err(|_| Error::deserial("invalid UTF-8 string payload".to_string()))
}
Ok(items)
}

pub(crate) fn count_i64_items_bytes(items: &[i64]) -> usize {
items.len() * 8
}
macro_rules! impl_primitive {
($name:ty, $read:ident, $write:ident) => {
impl FrequentItemValue for $name {
fn serialize_size(_item: &Self) -> usize {
size_of::<$name>()
}

pub(crate) fn serialize_i64_items(bytes: &mut SketchBytes, items: &[i64]) {
for item in items.iter().copied() {
bytes.write_i64_le(item);
}
}
fn serialize_value(&self, bytes: &mut SketchBytes) {
bytes.$write(*self);
}

pub(crate) fn deserialize_i64_items(
mut cursor: SketchSlice<'_>,
num_items: usize,
) -> Result<Vec<i64>, Error> {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let value = cursor.read_i64_le().map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} i64 items, failed at index {i}"
))
})?;
items.push(value);
}
Ok(items)
fn deserialize_value(cursor: &mut SketchSlice<'_>) -> Result<Self, Error> {
cursor.$read().map_err(|_| {
Error::insufficient_data(
concat!("failed to read ", stringify!($name), " item bytes").to_string(),
)
})
}
}
};
}

impl_primitive!(i64, read_i64_le, write_i64_le);
impl_primitive!(u64, read_u64_le, write_u64_le);
119 changes: 102 additions & 17 deletions datasketches/src/frequencies/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,12 @@ impl<T> Row<T> {
self.estimate
}

/// Returns the upper bound for the frequency.
/// Returns the guaranteed upper bound for the frequency.
pub fn upper_bound(&self) -> u64 {
self.upper_bound
}

/// Returns the guaranteed lower bound for the frequency.
///
/// This value is never negative.
pub fn lower_bound(&self) -> u64 {
self.lower_bound
}
Expand Down Expand Up @@ -115,7 +113,11 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
/// assert_eq!(sketch.num_active_items(), 2);
/// ```
pub fn new(max_map_size: usize) -> Self {
let lg_max_map_size = exact_log2(max_map_size);
assert!(
max_map_size.is_power_of_two(),
"max_map_size must be power of 2"
);
let lg_max_map_size = max_map_size.trailing_zeros() as u8;
Self::with_lg_map_sizes(lg_max_map_size, LG_MIN_MAP_SIZE)
}

Expand Down Expand Up @@ -155,16 +157,16 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {

/// Returns the guaranteed lower bound frequency for an item.
///
/// This value is never negative and is guaranteed to be no larger than the true frequency.
/// If the item is not tracked, the lower bound is zero.
/// This value is guaranteed to be no larger than the true frequency. If the item is not
/// tracked, the lower bound is zero.
pub fn lower_bound(&self, item: &T) -> u64 {
self.hash_map.get(item)
}

/// Returns the guaranteed upper bound frequency for an item.
///
/// This value is guaranteed to be no smaller than the true frequency.
/// If the item is tracked, this is `item_count + offset`.
/// This value is guaranteed to be no smaller than the true frequency. If the item is tracked,
/// this is `item_count + offset`.
pub fn upper_bound(&self, item: &T) -> u64 {
self.hash_map.get(item) + self.offset
}
Expand Down Expand Up @@ -544,7 +546,14 @@ impl FrequentItemsSketch<i64> {
/// assert!(decoded.estimate(&7) >= 2);
/// ```
pub fn serialize(&self) -> Vec<u8> {
self.serialize_inner(count_i64_items_bytes, serialize_i64_items)
self.serialize_inner(
|items| items.iter().map(i64::serialize_size).sum(),
|bytes, items| {
for item in items {
item.serialize_value(bytes);
}
},
)
}

/// Deserializes a sketch from bytes.
Expand All @@ -560,7 +569,70 @@ impl FrequentItemsSketch<i64> {
/// assert!(decoded.estimate(&7) >= 2);
/// ```
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_inner(bytes, deserialize_i64_items)
Self::deserialize_inner(bytes, |mut cursor, num_items| {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let item = i64::deserialize_value(&mut cursor).map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} items, failed to read item at index {i}"
))
})?;
items.push(item);
}
Ok(items)
})
}
}

impl FrequentItemsSketch<u64> {
/// Serializes this sketch into a byte vector.
///
/// # Examples
///
/// ```
/// # use datasketches::frequencies::FrequentItemsSketch;
/// # let mut sketch = FrequentItemsSketch::<i64>::new(64);
/// # sketch.update_with_count(7, 2);
/// let bytes = sketch.serialize();
/// let decoded = FrequentItemsSketch::<i64>::deserialize(&bytes).unwrap();
/// assert!(decoded.estimate(&7) >= 2);
/// ```
pub fn serialize(&self) -> Vec<u8> {
self.serialize_inner(
|items| items.iter().map(u64::serialize_size).sum(),
|bytes, items| {
for item in items {
item.serialize_value(bytes);
}
},
)
}

/// Deserializes a sketch from bytes.
///
/// # Examples
///
/// ```
/// # use datasketches::frequencies::FrequentItemsSketch;
/// # let mut sketch = FrequentItemsSketch::<u64>::new(64);
/// # sketch.update_with_count(7, 2);
/// # let bytes = sketch.serialize();
/// let decoded = FrequentItemsSketch::<u64>::deserialize(&bytes).unwrap();
/// assert!(decoded.estimate(&7) >= 2);
/// ```
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_inner(bytes, |mut cursor, num_items| {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let item = u64::deserialize_value(&mut cursor).map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} items, failed to read item at index {i}"
))
})?;
items.push(item);
}
Ok(items)
})
}
}

Expand All @@ -579,7 +651,14 @@ impl FrequentItemsSketch<String> {
/// assert!(decoded.estimate(&apple) >= 2);
/// ```
pub fn serialize(&self) -> Vec<u8> {
self.serialize_inner(count_string_items_bytes, serialize_string_items)
self.serialize_inner(
|items| items.iter().map(String::serialize_size).sum(),
|bytes, items| {
for item in items {
item.serialize_value(bytes);
}
},
)
}

/// Deserializes a sketch from bytes.
Expand All @@ -596,11 +675,17 @@ impl FrequentItemsSketch<String> {
/// assert!(decoded.estimate(&apple) >= 2);
/// ```
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_inner(bytes, deserialize_string_items)
Self::deserialize_inner(bytes, |mut cursor, num_items| {
let mut items = Vec::with_capacity(num_items);
for i in 0..num_items {
let item = String::deserialize_value(&mut cursor).map_err(|_| {
Error::insufficient_data(format!(
"expected {num_items} items, failed to read item at index {i}"
))
})?;
items.push(item);
}
Ok(items)
})
}
}

fn exact_log2(value: usize) -> u8 {
assert!(value.is_power_of_two(), "value must be power of 2");
value.trailing_zeros() as u8
}
4 changes: 2 additions & 2 deletions datasketches/tests/frequencies_update_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,13 @@ fn test_longs_reset() {
}

#[test]
#[should_panic(expected = "value must be power of 2")]
#[should_panic(expected = "max_map_size must be power of 2")]
fn test_longs_invalid_map_size_panics() {
FrequentItemsSketch::<i64>::new(6);
}

#[test]
#[should_panic(expected = "value must be power of 2")]
#[should_panic(expected = "max_map_size must be power of 2")]
fn test_items_invalid_map_size_panics() {
let _ = FrequentItemsSketch::<String>::new(6);
}