From 5c9337df4958d52355321e834368396cbf4655ee Mon Sep 17 00:00:00 2001 From: George Leung Date: Mon, 1 Dec 2025 15:20:33 -0800 Subject: [PATCH 1/9] lfg --- .../test/collapsing-merge-tree.test.ts | 280 ++++++++++ .../src/framework/python/generate.rs | 36 ++ .../src/framework/typescript/generate.rs | 26 + .../infrastructure/olap/clickhouse/queries.rs | 499 ++++++++++++++++++ packages/py-moose-lib/moose_lib/__init__.py | 4 + packages/py-moose-lib/moose_lib/blocks.py | 82 +++ .../py-moose-lib/moose_lib/dmv2/olap_table.py | 4 + packages/ts-moose-lib/src/blocks/helpers.ts | 2 + packages/ts-moose-lib/src/dmv2/internal.ts | 30 ++ .../python-tests/src/ingest/engine_tests.py | 57 ++ .../src/ingest/engineTests.ts | 55 ++ 11 files changed, 1075 insertions(+) create mode 100644 apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts diff --git a/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts b/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts new file mode 100644 index 000000000..062f5bb16 --- /dev/null +++ b/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts @@ -0,0 +1,280 @@ +/// +/// +/// + +/** + * End-to-end tests for CollapsingMergeTree and VersionedCollapsingMergeTree engines. + * + * These tests verify that: + * 1. Tables using CollapsingMergeTree and VersionedCollapsingMergeTree engines are created correctly + * 2. Both regular and replicated variants work properly + * 3. The sign and version parameters are correctly passed to ClickHouse + */ + +import { spawn, ChildProcess } from "child_process"; +import { expect } from "chai"; +import * as path from "path"; + +// Import constants and utilities +import { TIMEOUTS, TEMPLATE_NAMES, APP_NAMES } from "./constants"; + +import { + waitForServerStart, + waitForInfrastructureReady, + cleanupTestSuite, + performGlobalCleanup, + createTempTestDirectory, + setupTypeScriptProject, + setupPythonProject, + getTableDDL, +} from "./utils"; + +const CLI_PATH = path.resolve(__dirname, "../../../target/debug/moose-cli"); +const MOOSE_LIB_PATH = path.resolve( + __dirname, + "../../../packages/ts-moose-lib", +); +const MOOSE_PY_LIB_PATH = path.resolve( + __dirname, + "../../../packages/py-moose-lib", +); + +const TEST_PACKAGE_MANAGER = (process.env.TEST_PACKAGE_MANAGER || "npm") as + | "npm" + | "pnpm" + | "pip"; + +describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", function () { + this.timeout(TIMEOUTS.SUITE); + + describe("TypeScript Template - CollapsingMergeTree Engines", function () { + let devProcess: ChildProcess | null = null; + let testDir: string = ""; + const appName = APP_NAMES.TYPESCRIPT_TESTS; + + before(async function () { + this.timeout(TIMEOUTS.SETUP); + console.log("\nšŸš€ Setting up TypeScript CollapsingMergeTree test...\n"); + + testDir = await createTempTestDirectory(); + console.log(`Created temporary directory: ${testDir}`); + + console.log("Setting up TypeScript project..."); + devProcess = await setupTypeScriptProject( + CLI_PATH, + testDir, + TEMPLATE_NAMES.TYPESCRIPT_TESTS, + appName, + MOOSE_LIB_PATH, + TEST_PACKAGE_MANAGER as "npm" | "pnpm", + ); + + console.log("Waiting for server to start..."); + await waitForServerStart(devProcess, TIMEOUTS.SERVER_START); + + console.log("Waiting for infrastructure to be ready..."); + await waitForInfrastructureReady(devProcess, TIMEOUTS.INFRASTRUCTURE); + + console.log("āœ… TypeScript test setup completed successfully\n"); + }); + + after(async function () { + this.timeout(TIMEOUTS.CLEANUP); + await cleanupTestSuite( + devProcess, + testDir, + "TypeScript CollapsingMergeTree test", + ); + }); + + it("should create CollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL("CollapsingMergeTreeTest", "local"); + console.log("CollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has CollapsingMergeTree engine + expect(ddl).to.include("CollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + console.log("āœ… CollapsingMergeTree table created successfully"); + }); + + it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "VersionedCollapsingMergeTreeTest", + "local", + ); + console.log("VersionedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has VersionedCollapsingMergeTree engine + expect(ddl).to.include("VersionedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + expect(ddl).to.include("`version`"); + console.log("āœ… VersionedCollapsingMergeTree table created successfully"); + }); + + it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "ReplicatedCollapsingMergeTreeTest", + "local", + ); + console.log("ReplicatedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has ReplicatedCollapsingMergeTree engine + expect(ddl).to.include("ReplicatedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + // Verify it has replication parameters (keeper path and replica name) + expect(ddl).to.match( + /ReplicatedCollapsingMergeTree\([^)]*replicated_collapsing_test[^)]*\)/, + ); + console.log( + "āœ… ReplicatedCollapsingMergeTree table created successfully", + ); + }); + + it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "ReplicatedVersionedCollapsingMergeTreeTest", + "local", + ); + console.log("ReplicatedVersionedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has ReplicatedVersionedCollapsingMergeTree engine + expect(ddl).to.include("ReplicatedVersionedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + expect(ddl).to.include("`version`"); + // Verify it has replication parameters + expect(ddl).to.match( + /ReplicatedVersionedCollapsingMergeTree\([^)]*replicated_versioned_collapsing_test[^)]*\)/, + ); + console.log( + "āœ… ReplicatedVersionedCollapsingMergeTree table created successfully", + ); + }); + }); + + describe("Python Template - CollapsingMergeTree Engines", function () { + let devProcess: ChildProcess | null = null; + let testDir: string = ""; + const appName = APP_NAMES.PYTHON_TESTS; + + before(async function () { + this.timeout(TIMEOUTS.SETUP); + console.log("\nšŸš€ Setting up Python CollapsingMergeTree test...\n"); + + testDir = await createTempTestDirectory(); + console.log(`Created temporary directory: ${testDir}`); + + console.log("Setting up Python project..."); + devProcess = await setupPythonProject( + CLI_PATH, + testDir, + TEMPLATE_NAMES.PYTHON_TESTS, + appName, + MOOSE_PY_LIB_PATH, + TEST_PACKAGE_MANAGER as "pip", + ); + + console.log("Waiting for server to start..."); + await waitForServerStart(devProcess, TIMEOUTS.SERVER_START); + + console.log("Waiting for infrastructure to be ready..."); + await waitForInfrastructureReady(devProcess, TIMEOUTS.INFRASTRUCTURE); + + console.log("āœ… Python test setup completed successfully\n"); + }); + + after(async function () { + this.timeout(TIMEOUTS.CLEANUP); + await cleanupTestSuite( + devProcess, + testDir, + "Python CollapsingMergeTree test", + ); + }); + + it("should create CollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL("CollapsingMergeTreeTest", "local"); + console.log("CollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has CollapsingMergeTree engine + expect(ddl).to.include("CollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + console.log("āœ… CollapsingMergeTree table created successfully"); + }); + + it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "VersionedCollapsingMergeTreeTest", + "local", + ); + console.log("VersionedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has VersionedCollapsingMergeTree engine + expect(ddl).to.include("VersionedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + expect(ddl).to.include("`version`"); + console.log("āœ… VersionedCollapsingMergeTree table created successfully"); + }); + + it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "ReplicatedCollapsingMergeTreeTest", + "local", + ); + console.log("ReplicatedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has ReplicatedCollapsingMergeTree engine + expect(ddl).to.include("ReplicatedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + // Verify it has replication parameters + expect(ddl).to.match( + /ReplicatedCollapsingMergeTree\([^)]*replicated_collapsing_test[^)]*\)/, + ); + console.log( + "āœ… ReplicatedCollapsingMergeTree table created successfully", + ); + }); + + it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () { + this.timeout(TIMEOUTS.TEST); + + const ddl = await getTableDDL( + "ReplicatedVersionedCollapsingMergeTreeTest", + "local", + ); + console.log("ReplicatedVersionedCollapsingMergeTreeTest DDL:", ddl); + + // Verify the table exists and has ReplicatedVersionedCollapsingMergeTree engine + expect(ddl).to.include("ReplicatedVersionedCollapsingMergeTree"); + expect(ddl).to.include("`sign`"); + expect(ddl).to.include("`version`"); + // Verify it has replication parameters + expect(ddl).to.match( + /ReplicatedVersionedCollapsingMergeTree\([^)]*replicated_versioned_collapsing_test[^)]*\)/, + ); + console.log( + "āœ… ReplicatedVersionedCollapsingMergeTree table created successfully", + ); + }); + }); + + after(async function () { + this.timeout(TIMEOUTS.CLEANUP); + await performGlobalCleanup( + "CollapsingMergeTree and VersionedCollapsingMergeTree engine tests", + ); + }); +}); diff --git a/apps/framework-cli/src/framework/python/generate.rs b/apps/framework-cli/src/framework/python/generate.rs index e4362b697..62b60ab55 100644 --- a/apps/framework-cli/src/framework/python/generate.rs +++ b/apps/framework-cli/src/framework/python/generate.rs @@ -803,6 +803,12 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri } writeln!(output, "),").unwrap(); } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::CollapsingMergeTree { sign } => { + writeln!(output, " engine=CollapsingMergeTreeEngine(sign={:?}),", sign).unwrap(); + } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + writeln!(output, " engine=VersionedCollapsingMergeTreeEngine(sign={:?}, version={:?}),", sign, version).unwrap(); + } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name, @@ -861,6 +867,36 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri write!(output, "{}", params.join(", ")).unwrap(); writeln!(output, "),").unwrap(); } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + } => { + write!(output, " engine=ReplicatedCollapsingMergeTreeEngine(").unwrap(); + let mut params = vec![]; + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + params.push(format!("keeper_path={:?}, replica_name={:?}", path, name)); + } + params.push(format!("sign={:?}", sign)); + write!(output, "{}", params.join(", ")).unwrap(); + writeln!(output, "),").unwrap(); + } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + } => { + write!(output, " engine=ReplicatedVersionedCollapsingMergeTreeEngine(").unwrap(); + let mut params = vec![]; + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + params.push(format!("keeper_path={:?}, replica_name={:?}", path, name)); + } + params.push(format!("sign={:?}", sign)); + params.push(format!("version={:?}", version)); + write!(output, "{}", params.join(", ")).unwrap(); + writeln!(output, "),").unwrap(); + } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::S3 { path, format, diff --git a/apps/framework-cli/src/framework/typescript/generate.rs b/apps/framework-cli/src/framework/typescript/generate.rs index b5628cd29..d2f79f02d 100644 --- a/apps/framework-cli/src/framework/typescript/generate.rs +++ b/apps/framework-cli/src/framework/typescript/generate.rs @@ -712,6 +712,15 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> } } } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::CollapsingMergeTree { sign } => { + writeln!(output, " engine: ClickHouseEngines.CollapsingMergeTree,").unwrap(); + writeln!(output, " sign: {:?},", sign).unwrap(); + } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + writeln!(output, " engine: ClickHouseEngines.VersionedCollapsingMergeTree,").unwrap(); + writeln!(output, " sign: {:?},", sign).unwrap(); + writeln!(output, " version: {:?},", version).unwrap(); + } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name } => { writeln!(output, " engine: ClickHouseEngines.ReplicatedMergeTree,").unwrap(); if let (Some(path), Some(name)) = (keeper_path, replica_name) { @@ -752,6 +761,23 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> } } } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedCollapsingMergeTree { keeper_path, replica_name, sign } => { + writeln!(output, " engine: ClickHouseEngines.ReplicatedCollapsingMergeTree,").unwrap(); + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + writeln!(output, " keeperPath: {:?},", path).unwrap(); + writeln!(output, " replicaName: {:?},", name).unwrap(); + } + writeln!(output, " sign: {:?},", sign).unwrap(); + } + crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { keeper_path, replica_name, sign, version } => { + writeln!(output, " engine: ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree,").unwrap(); + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + writeln!(output, " keeperPath: {:?},", path).unwrap(); + writeln!(output, " replicaName: {:?},", name).unwrap(); + } + writeln!(output, " sign: {:?},", sign).unwrap(); + writeln!(output, " version: {:?},", version).unwrap(); + } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::S3 { path, format, diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs index 6de287a77..af5e9a79d 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -245,6 +245,16 @@ pub enum ClickhouseEngine { // Optional list of columns to sum columns: Option>, }, + CollapsingMergeTree { + // Sign column name indicating row type (1 = state, -1 = cancel) + sign: String, + }, + VersionedCollapsingMergeTree { + // Sign column name indicating row type (1 = state, -1 = cancel) + sign: String, + // Version column name for object state versioning + version: String, + }, ReplicatedMergeTree { // Keeper path for replication (ZooKeeper or ClickHouse Keeper) // Optional: omit for ClickHouse Cloud which manages replication automatically @@ -283,6 +293,28 @@ pub enum ClickhouseEngine { // Optional list of columns to sum columns: Option>, }, + ReplicatedCollapsingMergeTree { + // Keeper path for replication (ZooKeeper or ClickHouse Keeper) + // Optional: omit for ClickHouse Cloud which manages replication automatically + keeper_path: Option, + // Replica name + // Optional: omit for ClickHouse Cloud which manages replication automatically + replica_name: Option, + // Sign column name indicating row type (1 = state, -1 = cancel) + sign: String, + }, + ReplicatedVersionedCollapsingMergeTree { + // Keeper path for replication (ZooKeeper or ClickHouse Keeper) + // Optional: omit for ClickHouse Cloud which manages replication automatically + keeper_path: Option, + // Replica name + // Optional: omit for ClickHouse Cloud which manages replication automatically + replica_name: Option, + // Sign column name indicating row type (1 = state, -1 = cancel) + sign: String, + // Version column name for object state versioning + version: String, + }, S3Queue { // Non-alterable constructor parameters - required for table creation s3_path: String, @@ -350,6 +382,12 @@ impl Into for ClickhouseEngine { ClickhouseEngine::SummingMergeTree { columns } => { Self::serialize_summing_merge_tree(&columns) } + ClickhouseEngine::CollapsingMergeTree { sign } => { + Self::serialize_collapsing_merge_tree(&sign) + } + ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + Self::serialize_versioned_collapsing_merge_tree(&sign, &version) + } ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name, @@ -376,6 +414,24 @@ impl Into for ClickhouseEngine { } => { Self::serialize_replicated_summing_merge_tree(&keeper_path, &replica_name, &columns) } + ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + } => { + Self::serialize_replicated_collapsing_merge_tree(&keeper_path, &replica_name, &sign) + } + ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + } => Self::serialize_replicated_versioned_collapsing_merge_tree( + &keeper_path, + &replica_name, + &sign, + &version, + ), ClickhouseEngine::S3Queue { s3_path, format, @@ -487,6 +543,22 @@ impl ClickhouseEngine { return Some(Self::parse_distributed_summing_merge_tree(value)); } + // Handle SharedCollapsingMergeTree and ReplicatedCollapsingMergeTree + if value.starts_with("SharedCollapsingMergeTree(") + || value.starts_with("ReplicatedCollapsingMergeTree(") + { + return Some(Self::parse_distributed_collapsing_merge_tree(value)); + } + + // Handle SharedVersionedCollapsingMergeTree and ReplicatedVersionedCollapsingMergeTree + if value.starts_with("SharedVersionedCollapsingMergeTree(") + || value.starts_with("ReplicatedVersionedCollapsingMergeTree(") + { + return Some(Self::parse_distributed_versioned_collapsing_merge_tree( + value, + )); + } + None } @@ -744,6 +816,140 @@ impl ClickhouseEngine { } } + /// Parse SharedCollapsingMergeTree or ReplicatedCollapsingMergeTree + /// Format: (path, replica, sign) or (sign) for automatic configuration + fn parse_distributed_collapsing_merge_tree(value: &str) -> Result { + let content = Self::extract_engine_content( + value, + &[ + "SharedCollapsingMergeTree(", + "ReplicatedCollapsingMergeTree(", + ], + )?; + + let params = parse_quoted_csv(content); + + // Check if this is a Replicated variant (not Shared) + let is_replicated = value.starts_with("ReplicatedCollapsingMergeTree("); + + if is_replicated { + // For Replicated variant, we need either: + // - 1 param: sign (cloud mode) + // - 3 params: keeper_path, replica_name, sign + if params.is_empty() { + return Err(value); + } + + if params.len() == 1 { + // Cloud mode: only sign parameter + return Ok(ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path: None, + replica_name: None, + sign: params[0].clone(), + }); + } + + if params.len() != 3 { + return Err(value); + } + + // Full parameters: keeper_path, replica_name, sign + let keeper_path = params.first().cloned(); + let replica_name = params.get(1).cloned(); + let sign = params[2].clone(); + + // Normalize defaults back to None + let (keeper_path, replica_name) = + Self::normalize_replication_params(keeper_path, replica_name); + + Ok(ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + }) + } else { + // For SharedCollapsingMergeTree, we need 3 params: keeper_path, replica_name, sign + if params.len() != 3 { + return Err(value); + } + + // SharedCollapsingMergeTree normalizes to CollapsingMergeTree + // Skip the first two params (keeper_path and replica_name) + Ok(ClickhouseEngine::CollapsingMergeTree { + sign: params[2].clone(), + }) + } + } + + /// Parse SharedVersionedCollapsingMergeTree or ReplicatedVersionedCollapsingMergeTree + /// Format: (path, replica, sign, version) or (sign, version) for automatic configuration + fn parse_distributed_versioned_collapsing_merge_tree(value: &str) -> Result { + let content = Self::extract_engine_content( + value, + &[ + "SharedVersionedCollapsingMergeTree(", + "ReplicatedVersionedCollapsingMergeTree(", + ], + )?; + + let params = parse_quoted_csv(content); + + // Check if this is a Replicated variant (not Shared) + let is_replicated = value.starts_with("ReplicatedVersionedCollapsingMergeTree("); + + if is_replicated { + // For Replicated variant, we need either: + // - 2 params: sign, version (cloud mode) + // - 4 params: keeper_path, replica_name, sign, version + if params.is_empty() { + return Err(value); + } + + if params.len() == 2 { + // Cloud mode: only sign and version parameters + return Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path: None, + replica_name: None, + sign: params[0].clone(), + version: params[1].clone(), + }); + } + + if params.len() != 4 { + return Err(value); + } + + // Full parameters: keeper_path, replica_name, sign, version + let keeper_path = params.first().cloned(); + let replica_name = params.get(1).cloned(); + let sign = params[2].clone(); + let version = params[3].clone(); + + // Normalize defaults back to None + let (keeper_path, replica_name) = + Self::normalize_replication_params(keeper_path, replica_name); + + Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + }) + } else { + // For SharedVersionedCollapsingMergeTree, we need 4 params: keeper_path, replica_name, sign, version + if params.len() != 4 { + return Err(value); + } + + // SharedVersionedCollapsingMergeTree normalizes to VersionedCollapsingMergeTree + // Skip the first two params (keeper_path and replica_name) + Ok(ClickhouseEngine::VersionedCollapsingMergeTree { + sign: params[2].clone(), + version: params[3].clone(), + }) + } + } + /// Extract content from engine string with given prefixes /// Returns the content within parentheses fn extract_engine_content<'a>(value: &'a str, prefixes: &[&str]) -> Result<&'a str, &'a str> { @@ -813,6 +1019,12 @@ impl ClickhouseEngine { s if s.starts_with("SummingMergeTree(") => { Self::parse_regular_summing_merge_tree(s, value) } + s if s.starts_with("CollapsingMergeTree(") => { + Self::parse_regular_collapsing_merge_tree(s, value) + } + s if s.starts_with("VersionedCollapsingMergeTree(") => { + Self::parse_regular_versioned_collapsing_merge_tree(s, value) + } s if s.starts_with("S3Queue(") => Self::parse_regular_s3queue(s, value), s if s.starts_with("S3(") => Self::parse_regular_s3(s, value), s if s.starts_with("Buffer(") => Self::parse_regular_buffer(s, value), @@ -977,6 +1189,36 @@ impl ClickhouseEngine { } } + /// Parse regular CollapsingMergeTree with parameters + fn parse_regular_collapsing_merge_tree<'a>( + engine_name: &str, + original_value: &'a str, + ) -> Result { + if let Some(content) = engine_name + .strip_prefix("CollapsingMergeTree(") + .and_then(|s| s.strip_suffix(")")) + { + Self::parse_collapsing_merge_tree(content).map_err(|_| original_value) + } else { + Err(original_value) + } + } + + /// Parse regular VersionedCollapsingMergeTree with parameters + fn parse_regular_versioned_collapsing_merge_tree<'a>( + engine_name: &str, + original_value: &'a str, + ) -> Result { + if let Some(content) = engine_name + .strip_prefix("VersionedCollapsingMergeTree(") + .and_then(|s| s.strip_suffix(")")) + { + Self::parse_versioned_collapsing_merge_tree(content).map_err(|_| original_value) + } else { + Err(original_value) + } + } + /// Parse regular Buffer with parameters /// Format: Buffer('db', 'table', num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time][, flush_rows][, flush_bytes]) fn parse_regular_buffer<'a>( @@ -1157,10 +1399,14 @@ impl ClickhouseEngine { | ClickhouseEngine::ReplacingMergeTree { .. } | ClickhouseEngine::AggregatingMergeTree | ClickhouseEngine::SummingMergeTree { .. } + | ClickhouseEngine::CollapsingMergeTree { .. } + | ClickhouseEngine::VersionedCollapsingMergeTree { .. } | ClickhouseEngine::ReplicatedMergeTree { .. } | ClickhouseEngine::ReplicatedReplacingMergeTree { .. } | ClickhouseEngine::ReplicatedAggregatingMergeTree { .. } | ClickhouseEngine::ReplicatedSummingMergeTree { .. } + | ClickhouseEngine::ReplicatedCollapsingMergeTree { .. } + | ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { .. } ) } @@ -1182,6 +1428,12 @@ impl ClickhouseEngine { ClickhouseEngine::SummingMergeTree { columns } => { Self::serialize_summing_merge_tree(columns) } + ClickhouseEngine::CollapsingMergeTree { sign } => { + Self::serialize_collapsing_merge_tree(sign) + } + ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + Self::serialize_versioned_collapsing_merge_tree(sign, version) + } ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name, @@ -1206,6 +1458,22 @@ impl ClickhouseEngine { replica_name, columns, } => Self::serialize_replicated_summing_merge_tree(keeper_path, replica_name, columns), + ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + } => Self::serialize_replicated_collapsing_merge_tree(keeper_path, replica_name, sign), + ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + } => Self::serialize_replicated_versioned_collapsing_merge_tree( + keeper_path, + replica_name, + sign, + version, + ), ClickhouseEngine::S3Queue { s3_path, format, @@ -1615,6 +1883,18 @@ impl ClickhouseEngine { "SummingMergeTree".to_string() } + /// Serialize CollapsingMergeTree engine to string format + /// Format: CollapsingMergeTree('sign') + fn serialize_collapsing_merge_tree(sign: &str) -> String { + format!("CollapsingMergeTree('{}')", sign) + } + + /// Serialize VersionedCollapsingMergeTree engine to string format + /// Format: VersionedCollapsingMergeTree('sign', 'version') + fn serialize_versioned_collapsing_merge_tree(sign: &str, version: &str) -> String { + format!("VersionedCollapsingMergeTree('{}', '{}')", sign, version) + } + /// Serialize ReplicatedMergeTree engine to string format /// Format: ReplicatedMergeTree('keeper_path', 'replica_name') or ReplicatedMergeTree() for cloud fn serialize_replicated_merge_tree( @@ -1705,6 +1985,49 @@ impl ClickhouseEngine { } } + /// Serialize ReplicatedCollapsingMergeTree engine to string format + /// Format: ReplicatedCollapsingMergeTree('keeper_path', 'replica_name', 'sign') or ReplicatedCollapsingMergeTree('sign') for cloud + fn serialize_replicated_collapsing_merge_tree( + keeper_path: &Option, + replica_name: &Option, + sign: &str, + ) -> String { + let mut params = vec![]; + + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + params.push(format!("'{}'", path)); + params.push(format!("'{}'", name)); + } + + params.push(format!("'{}'", sign)); + + format!("ReplicatedCollapsingMergeTree({})", params.join(", ")) + } + + /// Serialize ReplicatedVersionedCollapsingMergeTree engine to string format + /// Format: ReplicatedVersionedCollapsingMergeTree('keeper_path', 'replica_name', 'sign', 'version') or ReplicatedVersionedCollapsingMergeTree('sign', 'version') for cloud + fn serialize_replicated_versioned_collapsing_merge_tree( + keeper_path: &Option, + replica_name: &Option, + sign: &str, + version: &str, + ) -> String { + let mut params = vec![]; + + if let (Some(path), Some(name)) = (keeper_path, replica_name) { + params.push(format!("'{}'", path)); + params.push(format!("'{}'", name)); + } + + params.push(format!("'{}'", sign)); + params.push(format!("'{}'", version)); + + format!( + "ReplicatedVersionedCollapsingMergeTree({})", + params.join(", ") + ) + } + /// Parse ReplacingMergeTree engine from serialized string format /// Expected format: ReplacingMergeTree('ver'[, 'is_deleted']) fn parse_replacing_merge_tree(content: &str) -> Result { @@ -1739,6 +2062,37 @@ impl ClickhouseEngine { Ok(ClickhouseEngine::SummingMergeTree { columns }) } + /// Parse CollapsingMergeTree engine from serialized string format + /// Expected format: CollapsingMergeTree('sign') + fn parse_collapsing_merge_tree(content: &str) -> Result { + let parts = parse_quoted_csv(content); + + if parts.len() != 1 { + return Err("CollapsingMergeTree requires exactly one parameter: sign column"); + } + + Ok(ClickhouseEngine::CollapsingMergeTree { + sign: parts[0].clone(), + }) + } + + /// Parse VersionedCollapsingMergeTree engine from serialized string format + /// Expected format: VersionedCollapsingMergeTree('sign', 'version') + fn parse_versioned_collapsing_merge_tree(content: &str) -> Result { + let parts = parse_quoted_csv(content); + + if parts.len() != 2 { + return Err( + "VersionedCollapsingMergeTree requires exactly two parameters: sign and version columns", + ); + } + + Ok(ClickhouseEngine::VersionedCollapsingMergeTree { + sign: parts[0].clone(), + version: parts[1].clone(), + }) + } + /// Parse S3Queue engine from serialized string format /// Expected format: S3Queue('path', 'format'[, 'compression'][, 'headers_json']) fn parse_s3queue(content: &str) -> Result { @@ -1958,6 +2312,15 @@ impl ClickhouseEngine { hasher.update("null".as_bytes()); } } + ClickhouseEngine::CollapsingMergeTree { sign } => { + hasher.update("CollapsingMergeTree".as_bytes()); + hasher.update(sign.as_bytes()); + } + ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + hasher.update("VersionedCollapsingMergeTree".as_bytes()); + hasher.update(sign.as_bytes()); + hasher.update(version.as_bytes()); + } ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name, @@ -2042,6 +2405,44 @@ impl ClickhouseEngine { hasher.update("null".as_bytes()); } } + ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + } => { + hasher.update("ReplicatedCollapsingMergeTree".as_bytes()); + if let Some(path) = keeper_path { + hasher.update(path.as_bytes()); + } else { + hasher.update("null".as_bytes()); + } + if let Some(name) = replica_name { + hasher.update(name.as_bytes()); + } else { + hasher.update("null".as_bytes()); + } + hasher.update(sign.as_bytes()); + } + ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + } => { + hasher.update("ReplicatedVersionedCollapsingMergeTree".as_bytes()); + if let Some(path) = keeper_path { + hasher.update(path.as_bytes()); + } else { + hasher.update("null".as_bytes()); + } + if let Some(name) = replica_name { + hasher.update(name.as_bytes()); + } else { + hasher.update("null".as_bytes()); + } + hasher.update(sign.as_bytes()); + hasher.update(version.as_bytes()); + } ClickhouseEngine::S3Queue { s3_path, format, @@ -2287,6 +2688,16 @@ fn build_summing_merge_tree_ddl(columns: &Option>) -> String { "SummingMergeTree".to_string() } +/// Generate DDL for CollapsingMergeTree engine +fn build_collapsing_merge_tree_ddl(sign: &str) -> String { + format!("CollapsingMergeTree(`{}`)", sign) +} + +/// Generate DDL for VersionedCollapsingMergeTree engine +fn build_versioned_collapsing_merge_tree_ddl(sign: &str, version: &str) -> String { + format!("VersionedCollapsingMergeTree(`{}`, `{}`)", sign, version) +} + /// Build replication parameters for replicated engines /// /// When keeper_path and replica_name are None: @@ -2453,6 +2864,60 @@ fn build_replicated_summing_merge_tree_ddl( Ok(format!("ReplicatedSummingMergeTree({})", params.join(", "))) } +/// Generate DDL for ReplicatedCollapsingMergeTree engine +fn build_replicated_collapsing_merge_tree_ddl( + keeper_path: &Option, + replica_name: &Option, + cluster_name: &Option, + sign: &str, + table_name: &str, + is_dev: bool, +) -> Result { + let mut params = build_replication_params( + keeper_path, + replica_name, + cluster_name, + "ReplicatedCollapsingMergeTree", + table_name, + is_dev, + )?; + + params.push(format!("`{}`", sign)); + + Ok(format!( + "ReplicatedCollapsingMergeTree({})", + params.join(", ") + )) +} + +/// Generate DDL for ReplicatedVersionedCollapsingMergeTree engine +fn build_replicated_versioned_collapsing_merge_tree_ddl( + keeper_path: &Option, + replica_name: &Option, + cluster_name: &Option, + sign: &str, + version: &str, + table_name: &str, + is_dev: bool, +) -> Result { + let mut params = build_replication_params( + keeper_path, + replica_name, + cluster_name, + "ReplicatedVersionedCollapsingMergeTree", + table_name, + is_dev, + )?; + + params.push(format!("`{}`", sign)); + params.push(format!("`{}`", version)); + + Ok(format!( + "ReplicatedVersionedCollapsingMergeTree({})", + params.join(", ") + )) +} + pub fn create_table_query( db_name: &str, table: ClickHouseTable, @@ -2470,6 +2935,10 @@ pub fn create_table_query( )?, ClickhouseEngine::AggregatingMergeTree => "AggregatingMergeTree".to_string(), ClickhouseEngine::SummingMergeTree { columns } => build_summing_merge_tree_ddl(columns), + ClickhouseEngine::CollapsingMergeTree { sign } => build_collapsing_merge_tree_ddl(sign), + ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { + build_versioned_collapsing_merge_tree_ddl(sign, version) + } ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name, @@ -2517,6 +2986,32 @@ pub fn create_table_query( &table.name, is_dev, )?, + ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + } => build_replicated_collapsing_merge_tree_ddl( + keeper_path, + replica_name, + &table.cluster_name, + sign, + &table.name, + is_dev, + )?, + ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + } => build_replicated_versioned_collapsing_merge_tree_ddl( + keeper_path, + replica_name, + &table.cluster_name, + sign, + version, + &table.name, + is_dev, + )?, ClickhouseEngine::S3Queue { s3_path, format, @@ -2768,10 +3263,14 @@ pub fn create_table_query( | ClickhouseEngine::ReplacingMergeTree { .. } | ClickhouseEngine::AggregatingMergeTree | ClickhouseEngine::SummingMergeTree { .. } + | ClickhouseEngine::CollapsingMergeTree { .. } + | ClickhouseEngine::VersionedCollapsingMergeTree { .. } | ClickhouseEngine::ReplicatedMergeTree { .. } | ClickhouseEngine::ReplicatedReplacingMergeTree { .. } | ClickhouseEngine::ReplicatedAggregatingMergeTree { .. } | ClickhouseEngine::ReplicatedSummingMergeTree { .. } + | ClickhouseEngine::ReplicatedCollapsingMergeTree { .. } + | ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { .. } | ClickhouseEngine::S3 { .. } ); diff --git a/packages/py-moose-lib/moose_lib/__init__.py b/packages/py-moose-lib/moose_lib/__init__.py index d50c9c72a..17edecd42 100644 --- a/packages/py-moose-lib/moose_lib/__init__.py +++ b/packages/py-moose-lib/moose_lib/__init__.py @@ -21,10 +21,14 @@ ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, + CollapsingMergeTreeEngine, + VersionedCollapsingMergeTreeEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, + ReplicatedVersionedCollapsingMergeTreeEngine, S3QueueEngine, IcebergS3Engine, EngineConfig, diff --git a/packages/py-moose-lib/moose_lib/blocks.py b/packages/py-moose-lib/moose_lib/blocks.py index 7b3a69de5..ed1c72dd1 100644 --- a/packages/py-moose-lib/moose_lib/blocks.py +++ b/packages/py-moose-lib/moose_lib/blocks.py @@ -66,6 +66,36 @@ class SummingMergeTreeEngine(EngineConfig): """ columns: Optional[List[str]] = None +@dataclass +class CollapsingMergeTreeEngine(EngineConfig): + """Configuration for CollapsingMergeTree engine + + Args: + sign: Column name indicating row type (1 = state, -1 = cancel) + """ + sign: str + + def __post_init__(self): + if not self.sign: + raise ValueError("sign column is required for CollapsingMergeTree") + +@dataclass +class VersionedCollapsingMergeTreeEngine(EngineConfig): + """Configuration for VersionedCollapsingMergeTree engine + + Args: + sign: Column name indicating row type (1 = state, -1 = cancel) + version: Column name for object state versioning + """ + sign: str + version: str + + def __post_init__(self): + if not self.sign: + raise ValueError("sign column is required for VersionedCollapsingMergeTree") + if not self.version: + raise ValueError("version column is required for VersionedCollapsingMergeTree") + @dataclass class ReplicatedMergeTreeEngine(EngineConfig): """Configuration for ReplicatedMergeTree engine (replicated version of MergeTree) @@ -154,6 +184,58 @@ def __post_init__(self): if (self.keeper_path is None) != (self.replica_name is None): raise ValueError("keeper_path and replica_name must both be provided or both be None") +@dataclass +class ReplicatedCollapsingMergeTreeEngine(EngineConfig): + """Configuration for ReplicatedCollapsingMergeTree engine (replicated version with collapsing) + + Args: + keeper_path: Keeper path for replication (e.g., '/clickhouse/tables/{database}/{shard}/table_name') + Optional: omit for ClickHouse Cloud which manages replication automatically + replica_name: Replica name (e.g., '{replica}') + Optional: omit for ClickHouse Cloud which manages replication automatically + sign: Column name indicating row type (1 = state, -1 = cancel) + + Note: Both keeper_path and replica_name must be provided together, or both omitted. + """ + keeper_path: Optional[str] = None + replica_name: Optional[str] = None + sign: str = field(default=None) + + def __post_init__(self): + # Both must be provided or both must be None + if (self.keeper_path is None) != (self.replica_name is None): + raise ValueError("keeper_path and replica_name must both be provided or both be None") + if not self.sign: + raise ValueError("sign column is required for ReplicatedCollapsingMergeTree") + +@dataclass +class ReplicatedVersionedCollapsingMergeTreeEngine(EngineConfig): + """Configuration for ReplicatedVersionedCollapsingMergeTree engine (replicated version with versioned collapsing) + + Args: + keeper_path: Keeper path for replication (e.g., '/clickhouse/tables/{database}/{shard}/table_name') + Optional: omit for ClickHouse Cloud which manages replication automatically + replica_name: Replica name (e.g., '{replica}') + Optional: omit for ClickHouse Cloud which manages replication automatically + sign: Column name indicating row type (1 = state, -1 = cancel) + version: Column name for object state versioning + + Note: Both keeper_path and replica_name must be provided together, or both omitted. + """ + keeper_path: Optional[str] = None + replica_name: Optional[str] = None + sign: str = field(default=None) + version: str = field(default=None) + + def __post_init__(self): + # Both must be provided or both must be None + if (self.keeper_path is None) != (self.replica_name is None): + raise ValueError("keeper_path and replica_name must both be provided or both be None") + if not self.sign: + raise ValueError("sign column is required for ReplicatedVersionedCollapsingMergeTree") + if not self.version: + raise ValueError("version column is required for ReplicatedVersionedCollapsingMergeTree") + @dataclass class S3QueueEngine(EngineConfig): """Configuration for S3Queue engine - only non-alterable constructor parameters. diff --git a/packages/py-moose-lib/moose_lib/dmv2/olap_table.py b/packages/py-moose-lib/moose_lib/dmv2/olap_table.py index 7e50b6704..d445b17c5 100644 --- a/packages/py-moose-lib/moose_lib/dmv2/olap_table.py +++ b/packages/py-moose-lib/moose_lib/dmv2/olap_table.py @@ -294,6 +294,8 @@ def __init__(self, name: str, config: OlapConfig = OlapConfig(), **kwargs): ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, + ReplicatedVersionedCollapsingMergeTreeEngine, ) if isinstance( @@ -303,6 +305,8 @@ def __init__(self, name: str, config: OlapConfig = OlapConfig(), **kwargs): ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, + ReplicatedVersionedCollapsingMergeTreeEngine, ), ): if ( diff --git a/packages/ts-moose-lib/src/blocks/helpers.ts b/packages/ts-moose-lib/src/blocks/helpers.ts index dc2ffd563..e9234c633 100644 --- a/packages/ts-moose-lib/src/blocks/helpers.ts +++ b/packages/ts-moose-lib/src/blocks/helpers.ts @@ -51,6 +51,8 @@ export enum ClickHouseEngines { ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree", ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree", ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree", + ReplicatedCollapsingMergeTree = "ReplicatedCollapsingMergeTree", + ReplicatedVersionedCollapsingMergeTree = "ReplicatedVersionedCollapsingMergeTree", } /** diff --git a/packages/ts-moose-lib/src/dmv2/internal.ts b/packages/ts-moose-lib/src/dmv2/internal.ts index a795f1c54..dedae6b58 100644 --- a/packages/ts-moose-lib/src/dmv2/internal.ts +++ b/packages/ts-moose-lib/src/dmv2/internal.ts @@ -84,6 +84,17 @@ interface SummingMergeTreeEngineConfig { columns?: string[]; } +interface CollapsingMergeTreeEngineConfig { + engine: "CollapsingMergeTree"; + sign: string; +} + +interface VersionedCollapsingMergeTreeEngineConfig { + engine: "VersionedCollapsingMergeTree"; + sign: string; + version: string; +} + interface ReplicatedMergeTreeEngineConfig { engine: "ReplicatedMergeTree"; keeperPath?: string; @@ -111,6 +122,21 @@ interface ReplicatedSummingMergeTreeEngineConfig { columns?: string[]; } +interface ReplicatedCollapsingMergeTreeEngineConfig { + engine: "ReplicatedCollapsingMergeTree"; + keeperPath?: string; + replicaName?: string; + sign: string; +} + +interface ReplicatedVersionedCollapsingMergeTreeEngineConfig { + engine: "ReplicatedVersionedCollapsingMergeTree"; + keeperPath?: string; + replicaName?: string; + sign: string; + version: string; +} + interface S3QueueEngineConfig { engine: "S3Queue"; s3Path: string; @@ -174,10 +200,14 @@ type EngineConfig = | ReplacingMergeTreeEngineConfig | AggregatingMergeTreeEngineConfig | SummingMergeTreeEngineConfig + | CollapsingMergeTreeEngineConfig + | VersionedCollapsingMergeTreeEngineConfig | ReplicatedMergeTreeEngineConfig | ReplicatedReplacingMergeTreeEngineConfig | ReplicatedAggregatingMergeTreeEngineConfig | ReplicatedSummingMergeTreeEngineConfig + | ReplicatedCollapsingMergeTreeEngineConfig + | ReplicatedVersionedCollapsingMergeTreeEngineConfig | S3QueueEngineConfig | S3EngineConfig | BufferEngineConfig diff --git a/templates/python-tests/src/ingest/engine_tests.py b/templates/python-tests/src/ingest/engine_tests.py index a93cce64e..978f590c7 100644 --- a/templates/python-tests/src/ingest/engine_tests.py +++ b/templates/python-tests/src/ingest/engine_tests.py @@ -7,10 +7,14 @@ ReplacingMergeTreeEngine, SummingMergeTreeEngine, AggregatingMergeTreeEngine, + CollapsingMergeTreeEngine, + VersionedCollapsingMergeTreeEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, + ReplicatedVersionedCollapsingMergeTreeEngine, BufferEngine, # S3QueueEngine - requires S3 configuration, tested separately ) @@ -80,6 +84,10 @@ class FixedStringTestData(BaseModel): ), ) +class CollapsingTestData(EngineTestData): + """Test data model for CollapsingMergeTree and VersionedCollapsingMergeTree testing""" + sign: int # For CollapsingMergeTree (1 = state, -1 = cancel) + class EngineTestDataSample(BaseModel): """Test data model for engine testing""" id: str @@ -153,6 +161,24 @@ class EngineTestDataSample(BaseModel): ) ) +# Test CollapsingMergeTree engine +collapsing_merge_tree_table = OlapTable[CollapsingTestData]( + "CollapsingMergeTreeTest", + OlapConfig( + engine=CollapsingMergeTreeEngine(sign="sign"), + order_by_fields=["id", "timestamp"] + ) +) + +# Test VersionedCollapsingMergeTree engine +versioned_collapsing_merge_tree_table = OlapTable[CollapsingTestData]( + "VersionedCollapsingMergeTreeTest", + OlapConfig( + engine=VersionedCollapsingMergeTreeEngine(sign="sign", version="version"), + order_by_fields=["id", "timestamp"] + ) +) + # Test ReplicatedMergeTree engine (with explicit keeper params - for self-hosted) replicated_merge_tree_table = OlapTable[EngineTestData]( "ReplicatedMergeTreeTest", @@ -228,6 +254,33 @@ class EngineTestDataSample(BaseModel): ) ) +# Test ReplicatedCollapsingMergeTree engine +replicated_collapsing_merge_tree_table = OlapTable[CollapsingTestData]( + "ReplicatedCollapsingMergeTreeTest", + OlapConfig( + engine=ReplicatedCollapsingMergeTreeEngine( + keeper_path="/clickhouse/tables/{database}/{shard}/replicated_collapsing_test", + replica_name="{replica}", + sign="sign" + ), + order_by_fields=["id", "timestamp"] + ) +) + +# Test ReplicatedVersionedCollapsingMergeTree engine +replicated_versioned_collapsing_merge_tree_table = OlapTable[CollapsingTestData]( + "ReplicatedVersionedCollapsingMergeTreeTest", + OlapConfig( + engine=ReplicatedVersionedCollapsingMergeTreeEngine( + keeper_path="/clickhouse/tables/{database}/{shard}/replicated_versioned_collapsing_test", + replica_name="{replica}", + sign="sign", + version="version" + ), + order_by_fields=["id", "timestamp"] + ) +) + # Test SAMPLE BY clause for data sampling sample_by_table = OlapTable[EngineTestDataSample]( "SampleByTest", @@ -280,12 +333,16 @@ class EngineTestDataSample(BaseModel): replacing_merge_tree_soft_delete_table, summing_merge_tree_table, aggregating_merge_tree_table, + collapsing_merge_tree_table, + versioned_collapsing_merge_tree_table, replicated_merge_tree_table, replicated_merge_tree_cloud_table, replicated_replacing_merge_tree_table, replicated_replacing_soft_delete_table, replicated_aggregating_merge_tree_table, replicated_summing_merge_tree_table, + replicated_collapsing_merge_tree_table, + replicated_versioned_collapsing_merge_tree_table, sample_by_table, ttl_table, buffer_destination_table, diff --git a/templates/typescript-tests/src/ingest/engineTests.ts b/templates/typescript-tests/src/ingest/engineTests.ts index 0782b441c..f93eb6024 100644 --- a/templates/typescript-tests/src/ingest/engineTests.ts +++ b/templates/typescript-tests/src/ingest/engineTests.ts @@ -23,6 +23,12 @@ export interface EngineTestData { version: number; isDeleted: boolean; // For ReplacingMergeTree soft deletes (UInt8 in ClickHouse) } + +// Test data model for CollapsingMergeTree and VersionedCollapsingMergeTree testing +export interface CollapsingTestData extends EngineTestData { + sign: number; // For CollapsingMergeTree (1 = state, -1 = cancel) +} + export interface EngineTestDataSample { id: string; timestamp: DateTime; @@ -121,6 +127,25 @@ export const AggregatingMergeTreeTable = new OlapTable( }, ); +// Test CollapsingMergeTree engine +export const CollapsingMergeTreeTable = new OlapTable( + "CollapsingMergeTreeTest", + { + engine: ClickHouseEngines.CollapsingMergeTree, + sign: "sign", + orderByFields: ["id", "timestamp"], + }, +); + +// Test VersionedCollapsingMergeTree engine +export const VersionedCollapsingMergeTreeTable = + new OlapTable("VersionedCollapsingMergeTreeTest", { + engine: ClickHouseEngines.VersionedCollapsingMergeTree, + sign: "sign", + version: "version", + orderByFields: ["id", "timestamp"], + }); + // Test SummingMergeTree engine with columns export const SummingMergeTreeWithColumnsTable = new OlapTable( "SummingMergeTreeWithColumnsTest", @@ -203,6 +228,32 @@ export const ReplicatedSummingMergeTreeTable = new OlapTable( }, ); +// Test ReplicatedCollapsingMergeTree engine +export const ReplicatedCollapsingMergeTreeTable = + new OlapTable("ReplicatedCollapsingMergeTreeTest", { + engine: ClickHouseEngines.ReplicatedCollapsingMergeTree, + keeperPath: + "/clickhouse/tables/{database}/{shard}/replicated_collapsing_test", + replicaName: "{replica}", + sign: "sign", + orderByFields: ["id", "timestamp"], + }); + +// Test ReplicatedVersionedCollapsingMergeTree engine +export const ReplicatedVersionedCollapsingMergeTreeTable = + new OlapTable( + "ReplicatedVersionedCollapsingMergeTreeTest", + { + engine: ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree, + keeperPath: + "/clickhouse/tables/{database}/{shard}/replicated_versioned_collapsing_test", + replicaName: "{replica}", + sign: "sign", + version: "version", + orderByFields: ["id", "timestamp"], + }, + ); + // Test SAMPLE BY clause for data sampling export const SampleByTable = new OlapTable( "SampleByTest", @@ -254,12 +305,16 @@ export const allEngineTestTables = [ SummingMergeTreeTable, SummingMergeTreeWithColumnsTable, AggregatingMergeTreeTable, + CollapsingMergeTreeTable, + VersionedCollapsingMergeTreeTable, ReplicatedMergeTreeTable, ReplicatedMergeTreeCloudTable, ReplicatedReplacingMergeTreeTable, ReplicatedReplacingSoftDeleteTable, ReplicatedAggregatingMergeTreeTable, ReplicatedSummingMergeTreeTable, + ReplicatedCollapsingMergeTreeTable, + ReplicatedVersionedCollapsingMergeTreeTable, SampleByTable, TTLTable, BufferDestinationTable, From f79cc86285389db02f0b59aacfdb38224bdd8bdf Mon Sep 17 00:00:00 2001 From: George Leung Date: Mon, 1 Dec 2025 15:39:26 -0800 Subject: [PATCH 2/9] fix test --- .../test/collapsing-merge-tree.test.ts | 115 +++++++++++------- 1 file changed, 73 insertions(+), 42 deletions(-) diff --git a/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts b/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts index 062f5bb16..59744a4c1 100644 --- a/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts +++ b/apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts @@ -16,11 +16,17 @@ import { expect } from "chai"; import * as path from "path"; // Import constants and utilities -import { TIMEOUTS, TEMPLATE_NAMES, APP_NAMES } from "./constants"; +import { + TIMEOUTS, + TEMPLATE_NAMES, + APP_NAMES, + SERVER_CONFIG, +} from "./constants"; import { waitForServerStart, waitForInfrastructureReady, + waitForStreamingFunctions, cleanupTestSuite, performGlobalCleanup, createTempTestDirectory, @@ -45,50 +51,61 @@ const TEST_PACKAGE_MANAGER = (process.env.TEST_PACKAGE_MANAGER || "npm") as | "pip"; describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", function () { - this.timeout(TIMEOUTS.SUITE); - describe("TypeScript Template - CollapsingMergeTree Engines", function () { let devProcess: ChildProcess | null = null; let testDir: string = ""; const appName = APP_NAMES.TYPESCRIPT_TESTS; before(async function () { - this.timeout(TIMEOUTS.SETUP); + this.timeout(TIMEOUTS.TEST_SETUP_MS); console.log("\nšŸš€ Setting up TypeScript CollapsingMergeTree test...\n"); - testDir = await createTempTestDirectory(); + testDir = createTempTestDirectory("ts-collapsing-mt"); console.log(`Created temporary directory: ${testDir}`); console.log("Setting up TypeScript project..."); - devProcess = await setupTypeScriptProject( - CLI_PATH, + await setupTypeScriptProject( testDir, TEMPLATE_NAMES.TYPESCRIPT_TESTS, - appName, + CLI_PATH, MOOSE_LIB_PATH, + appName, TEST_PACKAGE_MANAGER as "npm" | "pnpm", ); + console.log("Starting dev server..."); + devProcess = spawn(CLI_PATH, ["dev"], { + stdio: "pipe", + cwd: testDir, + env: process.env, + }); + console.log("Waiting for server to start..."); - await waitForServerStart(devProcess, TIMEOUTS.SERVER_START); + await waitForServerStart( + devProcess, + TIMEOUTS.SERVER_STARTUP_MS, + SERVER_CONFIG.startupMessage, + SERVER_CONFIG.url, + ); + + console.log("Waiting for streaming functions..."); + await waitForStreamingFunctions(); console.log("Waiting for infrastructure to be ready..."); - await waitForInfrastructureReady(devProcess, TIMEOUTS.INFRASTRUCTURE); + await waitForInfrastructureReady(); console.log("āœ… TypeScript test setup completed successfully\n"); }); after(async function () { - this.timeout(TIMEOUTS.CLEANUP); - await cleanupTestSuite( - devProcess, - testDir, - "TypeScript CollapsingMergeTree test", - ); + this.timeout(TIMEOUTS.CLEANUP_MS); + await cleanupTestSuite(devProcess, testDir, appName, { + logPrefix: "TypeScript CollapsingMergeTree test", + }); }); it("should create CollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL("CollapsingMergeTreeTest", "local"); console.log("CollapsingMergeTreeTest DDL:", ddl); @@ -100,7 +117,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "VersionedCollapsingMergeTreeTest", @@ -116,7 +133,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "ReplicatedCollapsingMergeTreeTest", @@ -137,7 +154,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "ReplicatedVersionedCollapsingMergeTreeTest", @@ -165,42 +182,58 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu const appName = APP_NAMES.PYTHON_TESTS; before(async function () { - this.timeout(TIMEOUTS.SETUP); + this.timeout(TIMEOUTS.TEST_SETUP_MS); console.log("\nšŸš€ Setting up Python CollapsingMergeTree test...\n"); - testDir = await createTempTestDirectory(); + testDir = createTempTestDirectory("py-collapsing-mt"); console.log(`Created temporary directory: ${testDir}`); console.log("Setting up Python project..."); - devProcess = await setupPythonProject( - CLI_PATH, + await setupPythonProject( testDir, TEMPLATE_NAMES.PYTHON_TESTS, - appName, + CLI_PATH, MOOSE_PY_LIB_PATH, - TEST_PACKAGE_MANAGER as "pip", + appName, ); + console.log("Starting dev server..."); + devProcess = spawn(CLI_PATH, ["dev"], { + stdio: "pipe", + cwd: testDir, + env: { + ...process.env, + VIRTUAL_ENV: path.join(testDir, ".venv"), + PATH: `${path.join(testDir, ".venv", "bin")}:${process.env.PATH}`, + }, + }); + console.log("Waiting for server to start..."); - await waitForServerStart(devProcess, TIMEOUTS.SERVER_START); + await waitForServerStart( + devProcess, + TIMEOUTS.SERVER_STARTUP_MS, + SERVER_CONFIG.startupMessage, + SERVER_CONFIG.url, + ); + + console.log("Waiting for streaming functions..."); + await waitForStreamingFunctions(); console.log("Waiting for infrastructure to be ready..."); - await waitForInfrastructureReady(devProcess, TIMEOUTS.INFRASTRUCTURE); + await waitForInfrastructureReady(); console.log("āœ… Python test setup completed successfully\n"); }); after(async function () { - this.timeout(TIMEOUTS.CLEANUP); - await cleanupTestSuite( - devProcess, - testDir, - "Python CollapsingMergeTree test", - ); + this.timeout(TIMEOUTS.CLEANUP_MS); + await cleanupTestSuite(devProcess, testDir, appName, { + logPrefix: "Python CollapsingMergeTree test", + }); }); it("should create CollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL("CollapsingMergeTreeTest", "local"); console.log("CollapsingMergeTreeTest DDL:", ddl); @@ -212,7 +245,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "VersionedCollapsingMergeTreeTest", @@ -228,7 +261,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "ReplicatedCollapsingMergeTreeTest", @@ -249,7 +282,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () { - this.timeout(TIMEOUTS.TEST); + this.timeout(TIMEOUTS.TEST_SETUP_MS); const ddl = await getTableDDL( "ReplicatedVersionedCollapsingMergeTreeTest", @@ -272,9 +305,7 @@ describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", fu }); after(async function () { - this.timeout(TIMEOUTS.CLEANUP); - await performGlobalCleanup( - "CollapsingMergeTree and VersionedCollapsingMergeTree engine tests", - ); + this.timeout(TIMEOUTS.GLOBAL_CLEANUP_MS); + await performGlobalCleanup(); }); }); From 67b27f749e4df5071e1ce52b917c4687d8b9891b Mon Sep 17 00:00:00 2001 From: George Leung Date: Mon, 1 Dec 2025 15:56:57 -0800 Subject: [PATCH 3/9] You're absolutely right! --- packages/py-moose-lib/moose_lib/internal.py | 60 ++++++++++++++++++++- packages/ts-moose-lib/src/blocks/runner.ts | 4 +- packages/ts-moose-lib/src/dmv2/internal.ts | 38 +++++++++++++ 3 files changed, 98 insertions(+), 4 deletions(-) diff --git a/packages/py-moose-lib/moose_lib/internal.py b/packages/py-moose-lib/moose_lib/internal.py index fd7215616..778d46faa 100644 --- a/packages/py-moose-lib/moose_lib/internal.py +++ b/packages/py-moose-lib/moose_lib/internal.py @@ -86,6 +86,19 @@ class SummingMergeTreeConfigDict(BaseEngineConfigDict): columns: Optional[List[str]] = None +class CollapsingMergeTreeConfigDict(BaseEngineConfigDict): + """Configuration for CollapsingMergeTree engine.""" + engine: Literal["CollapsingMergeTree"] = "CollapsingMergeTree" + sign: str + + +class VersionedCollapsingMergeTreeConfigDict(BaseEngineConfigDict): + """Configuration for VersionedCollapsingMergeTree engine.""" + engine: Literal["VersionedCollapsingMergeTree"] = "VersionedCollapsingMergeTree" + sign: str + version: str + + class ReplicatedMergeTreeConfigDict(BaseEngineConfigDict): """Configuration for ReplicatedMergeTree engine.""" engine: Literal["ReplicatedMergeTree"] = "ReplicatedMergeTree" @@ -117,6 +130,23 @@ class ReplicatedSummingMergeTreeConfigDict(BaseEngineConfigDict): columns: Optional[List[str]] = None +class ReplicatedCollapsingMergeTreeConfigDict(BaseEngineConfigDict): + """Configuration for ReplicatedCollapsingMergeTree engine.""" + engine: Literal["ReplicatedCollapsingMergeTree"] = "ReplicatedCollapsingMergeTree" + keeper_path: Optional[str] = None + replica_name: Optional[str] = None + sign: str + + +class ReplicatedVersionedCollapsingMergeTreeConfigDict(BaseEngineConfigDict): + """Configuration for ReplicatedVersionedCollapsingMergeTree engine.""" + engine: Literal["ReplicatedVersionedCollapsingMergeTree"] = "ReplicatedVersionedCollapsingMergeTree" + keeper_path: Optional[str] = None + replica_name: Optional[str] = None + sign: str + version: str + + class S3QueueConfigDict(BaseEngineConfigDict): """Configuration for S3Queue engine with all specific fields.""" engine: Literal["S3Queue"] = "S3Queue" @@ -183,10 +213,14 @@ class IcebergS3ConfigDict(BaseEngineConfigDict): ReplacingMergeTreeConfigDict, AggregatingMergeTreeConfigDict, SummingMergeTreeConfigDict, + CollapsingMergeTreeConfigDict, + VersionedCollapsingMergeTreeConfigDict, ReplicatedMergeTreeConfigDict, ReplicatedReplacingMergeTreeConfigDict, ReplicatedAggregatingMergeTreeConfigDict, ReplicatedSummingMergeTreeConfigDict, + ReplicatedCollapsingMergeTreeConfigDict, + ReplicatedVersionedCollapsingMergeTreeConfigDict, S3QueueConfigDict, S3ConfigDict, BufferConfigDict, @@ -455,7 +489,8 @@ def _convert_basic_engine_instance(engine: "EngineConfig") -> Optional[EngineCon """ from moose_lib.blocks import ( MergeTreeEngine, ReplacingMergeTreeEngine, - AggregatingMergeTreeEngine, SummingMergeTreeEngine + AggregatingMergeTreeEngine, SummingMergeTreeEngine, + CollapsingMergeTreeEngine, VersionedCollapsingMergeTreeEngine ) if isinstance(engine, MergeTreeEngine): @@ -469,6 +504,13 @@ def _convert_basic_engine_instance(engine: "EngineConfig") -> Optional[EngineCon return AggregatingMergeTreeConfigDict() elif isinstance(engine, SummingMergeTreeEngine): return SummingMergeTreeConfigDict(columns=engine.columns) + elif isinstance(engine, CollapsingMergeTreeEngine): + return CollapsingMergeTreeConfigDict(sign=engine.sign) + elif isinstance(engine, VersionedCollapsingMergeTreeEngine): + return VersionedCollapsingMergeTreeConfigDict( + sign=engine.sign, + version=engine.version + ) return None @@ -483,7 +525,8 @@ def _convert_replicated_engine_instance(engine: "EngineConfig") -> Optional[Engi """ from moose_lib.blocks import ( ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, - ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine + ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, ReplicatedVersionedCollapsingMergeTreeEngine ) if isinstance(engine, ReplicatedMergeTreeEngine): @@ -509,6 +552,19 @@ def _convert_replicated_engine_instance(engine: "EngineConfig") -> Optional[Engi replica_name=engine.replica_name, columns=engine.columns ) + elif isinstance(engine, ReplicatedCollapsingMergeTreeEngine): + return ReplicatedCollapsingMergeTreeConfigDict( + keeper_path=engine.keeper_path, + replica_name=engine.replica_name, + sign=engine.sign + ) + elif isinstance(engine, ReplicatedVersionedCollapsingMergeTreeEngine): + return ReplicatedVersionedCollapsingMergeTreeConfigDict( + keeper_path=engine.keeper_path, + replica_name=engine.replica_name, + sign=engine.sign, + version=engine.version + ) return None diff --git a/packages/ts-moose-lib/src/blocks/runner.ts b/packages/ts-moose-lib/src/blocks/runner.ts index 0e1dd6ec2..db6a0a021 100755 --- a/packages/ts-moose-lib/src/blocks/runner.ts +++ b/packages/ts-moose-lib/src/blocks/runner.ts @@ -56,7 +56,7 @@ const createBlocks = async (chClient: ClickHouseClient, blocks: Blocks) => { for (const query of blocks.setup) { try { console.log(`Creating block using query ${query}`); - await chClient.command({ + await chClient.command({ query, clickhouse_settings: { wait_end_of_query: 1, // Ensure at least once delivery and DDL acknowledgment @@ -79,7 +79,7 @@ const deleteBlocks = async (chClient: ClickHouseClient, blocks: Blocks) => { for (const query of blocks.teardown) { try { console.log(`Deleting block using query ${query}`); - await chClient.command({ + await chClient.command({ query, clickhouse_settings: { wait_end_of_query: 1, // Ensure at least once delivery and DDL acknowledgment diff --git a/packages/ts-moose-lib/src/dmv2/internal.ts b/packages/ts-moose-lib/src/dmv2/internal.ts index dedae6b58..44b0767fa 100644 --- a/packages/ts-moose-lib/src/dmv2/internal.ts +++ b/packages/ts-moose-lib/src/dmv2/internal.ts @@ -471,6 +471,23 @@ function convertBasicEngineConfig( }; } + case ClickHouseEngines.CollapsingMergeTree: { + const collapsingConfig = config as any; // CollapsingMergeTreeConfig + return { + engine: "CollapsingMergeTree", + sign: collapsingConfig.sign, + }; + } + + case ClickHouseEngines.VersionedCollapsingMergeTree: { + const versionedConfig = config as any; // VersionedCollapsingMergeTreeConfig + return { + engine: "VersionedCollapsingMergeTree", + sign: versionedConfig.sign, + version: versionedConfig.version, + }; + } + default: return undefined; } @@ -530,6 +547,27 @@ function convertReplicatedEngineConfig( }; } + case ClickHouseEngines.ReplicatedCollapsingMergeTree: { + const replicatedConfig = config as any; // ReplicatedCollapsingMergeTreeConfig + return { + engine: "ReplicatedCollapsingMergeTree", + keeperPath: replicatedConfig.keeperPath, + replicaName: replicatedConfig.replicaName, + sign: replicatedConfig.sign, + }; + } + + case ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree: { + const replicatedConfig = config as any; // ReplicatedVersionedCollapsingMergeTreeConfig + return { + engine: "ReplicatedVersionedCollapsingMergeTree", + keeperPath: replicatedConfig.keeperPath, + replicaName: replicatedConfig.replicaName, + sign: replicatedConfig.sign, + version: replicatedConfig.version, + }; + } + default: return undefined; } From 1083fa5312ab6c72d6c88b787a240dfb84a10566 Mon Sep 17 00:00:00 2001 From: George Leung Date: Mon, 1 Dec 2025 16:18:09 -0800 Subject: [PATCH 4/9] You're absolutely right! --- .../core/partial_infrastructure_map.rs | 58 +++++++++++++++++++ packages/ts-moose-lib/src/dmv2/internal.ts | 8 +-- .../ts-moose-lib/src/dmv2/sdk/olapTable.ts | 55 ++++++++++++++++++ .../src/ingest/engineTests.ts | 4 +- 4 files changed, 119 insertions(+), 6 deletions(-) diff --git a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs index 1bc781c1c..97d40f817 100644 --- a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs @@ -189,6 +189,12 @@ enum EngineConfig { columns: Option>, }, + #[serde(rename = "CollapsingMergeTree")] + CollapsingMergeTree { sign: String }, + + #[serde(rename = "VersionedCollapsingMergeTree")] + VersionedCollapsingMergeTree { sign: String, version: String }, + #[serde(rename = "ReplicatedMergeTree")] ReplicatedMergeTree { #[serde(alias = "keeperPath", default)] @@ -227,6 +233,25 @@ enum EngineConfig { columns: Option>, }, + #[serde(rename = "ReplicatedCollapsingMergeTree")] + ReplicatedCollapsingMergeTree { + #[serde(alias = "keeperPath", default)] + keeper_path: Option, + #[serde(alias = "replicaName", default)] + replica_name: Option, + sign: String, + }, + + #[serde(rename = "ReplicatedVersionedCollapsingMergeTree")] + ReplicatedVersionedCollapsingMergeTree { + #[serde(alias = "keeperPath", default)] + keeper_path: Option, + #[serde(alias = "replicaName", default)] + replica_name: Option, + sign: String, + version: String, + }, + #[serde(rename = "S3Queue")] S3Queue(Box), @@ -783,6 +808,17 @@ impl PartialInfrastructureMap { }) } + Some(EngineConfig::CollapsingMergeTree { sign }) => { + Ok(ClickhouseEngine::CollapsingMergeTree { sign: sign.clone() }) + } + + Some(EngineConfig::VersionedCollapsingMergeTree { sign, version }) => { + Ok(ClickhouseEngine::VersionedCollapsingMergeTree { + sign: sign.clone(), + version: version.clone(), + }) + } + Some(EngineConfig::ReplicatedMergeTree { keeper_path, replica_name, @@ -821,6 +857,28 @@ impl PartialInfrastructureMap { columns: columns.clone(), }), + Some(EngineConfig::ReplicatedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + }) => Ok(ClickhouseEngine::ReplicatedCollapsingMergeTree { + keeper_path: keeper_path.clone(), + replica_name: replica_name.clone(), + sign: sign.clone(), + }), + + Some(EngineConfig::ReplicatedVersionedCollapsingMergeTree { + keeper_path, + replica_name, + sign, + version, + }) => Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { + keeper_path: keeper_path.clone(), + replica_name: replica_name.clone(), + sign: sign.clone(), + version: version.clone(), + }), + Some(EngineConfig::S3Queue(config)) => { // Keep environment variable markers as-is - credentials will be resolved at runtime // S3Queue settings are handled in table_settings, not in the engine diff --git a/packages/ts-moose-lib/src/dmv2/internal.ts b/packages/ts-moose-lib/src/dmv2/internal.ts index 44b0767fa..d4502948d 100644 --- a/packages/ts-moose-lib/src/dmv2/internal.ts +++ b/packages/ts-moose-lib/src/dmv2/internal.ts @@ -92,7 +92,7 @@ interface CollapsingMergeTreeEngineConfig { interface VersionedCollapsingMergeTreeEngineConfig { engine: "VersionedCollapsingMergeTree"; sign: string; - version: string; + version: string; // Note: This is the version column name, not table version } interface ReplicatedMergeTreeEngineConfig { @@ -134,7 +134,7 @@ interface ReplicatedVersionedCollapsingMergeTreeEngineConfig { keeperPath?: string; replicaName?: string; sign: string; - version: string; + version: string; // Note: This is the version column name, not table version } interface S3QueueEngineConfig { @@ -484,7 +484,7 @@ function convertBasicEngineConfig( return { engine: "VersionedCollapsingMergeTree", sign: versionedConfig.sign, - version: versionedConfig.version, + version: versionedConfig.ver, }; } @@ -564,7 +564,7 @@ function convertReplicatedEngineConfig( keeperPath: replicatedConfig.keeperPath, replicaName: replicatedConfig.replicaName, sign: replicatedConfig.sign, - version: replicatedConfig.version, + version: replicatedConfig.ver, }; } diff --git a/packages/ts-moose-lib/src/dmv2/sdk/olapTable.ts b/packages/ts-moose-lib/src/dmv2/sdk/olapTable.ts index 25d564c30..d8c333df0 100644 --- a/packages/ts-moose-lib/src/dmv2/sdk/olapTable.ts +++ b/packages/ts-moose-lib/src/dmv2/sdk/olapTable.ts @@ -284,6 +284,25 @@ export type SummingMergeTreeConfig = BaseOlapConfig & { columns?: string[]; }; +/** + * Configuration for CollapsingMergeTree engine + * @template T The data type of the records stored in the table. + */ +export type CollapsingMergeTreeConfig = BaseOlapConfig & { + engine: ClickHouseEngines.CollapsingMergeTree; + sign: keyof T & string; // Sign column (1 = state, -1 = cancel) +}; + +/** + * Configuration for VersionedCollapsingMergeTree engine + * @template T The data type of the records stored in the table. + */ +export type VersionedCollapsingMergeTreeConfig = BaseOlapConfig & { + engine: ClickHouseEngines.VersionedCollapsingMergeTree; + sign: keyof T & string; // Sign column (1 = state, -1 = cancel) + ver: keyof T & string; // Version column for ordering state changes +}; + interface ReplicatedEngineProperties { keeperPath?: string; replicaName?: string; @@ -350,6 +369,38 @@ export type ReplicatedSummingMergeTreeConfig = Omit< engine: ClickHouseEngines.ReplicatedSummingMergeTree; }; +/** + * Configuration for ReplicatedCollapsingMergeTree engine + * @template T The data type of the records stored in the table. + * + * Note: keeperPath and replicaName are optional. Omit them for ClickHouse Cloud, + * which manages replication automatically. For self-hosted with ClickHouse Keeper, + * provide both parameters or neither (to use server defaults). + */ +export type ReplicatedCollapsingMergeTreeConfig = Omit< + CollapsingMergeTreeConfig, + "engine" +> & + ReplicatedEngineProperties & { + engine: ClickHouseEngines.ReplicatedCollapsingMergeTree; + }; + +/** + * Configuration for ReplicatedVersionedCollapsingMergeTree engine + * @template T The data type of the records stored in the table. + * + * Note: keeperPath and replicaName are optional. Omit them for ClickHouse Cloud, + * which manages replication automatically. For self-hosted with ClickHouse Keeper, + * provide both parameters or neither (to use server defaults). + */ +export type ReplicatedVersionedCollapsingMergeTreeConfig = Omit< + VersionedCollapsingMergeTreeConfig, + "engine" +> & + ReplicatedEngineProperties & { + engine: ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree; + }; + /** * Configuration for S3Queue engine - only non-alterable constructor parameters. * S3Queue-specific settings like 'mode', 'keeper_path', etc. should be specified @@ -511,10 +562,14 @@ type EngineConfig = | ReplacingMergeTreeConfig | AggregatingMergeTreeConfig | SummingMergeTreeConfig + | CollapsingMergeTreeConfig + | VersionedCollapsingMergeTreeConfig | ReplicatedMergeTreeConfig | ReplicatedReplacingMergeTreeConfig | ReplicatedAggregatingMergeTreeConfig | ReplicatedSummingMergeTreeConfig + | ReplicatedCollapsingMergeTreeConfig + | ReplicatedVersionedCollapsingMergeTreeConfig | S3QueueConfig | S3Config | BufferConfig diff --git a/templates/typescript-tests/src/ingest/engineTests.ts b/templates/typescript-tests/src/ingest/engineTests.ts index f93eb6024..1bb0af3a4 100644 --- a/templates/typescript-tests/src/ingest/engineTests.ts +++ b/templates/typescript-tests/src/ingest/engineTests.ts @@ -142,7 +142,7 @@ export const VersionedCollapsingMergeTreeTable = new OlapTable("VersionedCollapsingMergeTreeTest", { engine: ClickHouseEngines.VersionedCollapsingMergeTree, sign: "sign", - version: "version", + ver: "version", orderByFields: ["id", "timestamp"], }); @@ -249,7 +249,7 @@ export const ReplicatedVersionedCollapsingMergeTreeTable = "/clickhouse/tables/{database}/{shard}/replicated_versioned_collapsing_test", replicaName: "{replica}", sign: "sign", - version: "version", + ver: "version", orderByFields: ["id", "timestamp"], }, ); From 9f980a29ead53f59d5f98ee9410326700ad1afbb Mon Sep 17 00:00:00 2001 From: George Leung Date: Mon, 1 Dec 2025 18:20:34 -0800 Subject: [PATCH 5/9] fix --- templates/python-tests/src/ingest/engine_tests.py | 4 ++-- templates/typescript-tests/src/ingest/engineTests.ts | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/templates/python-tests/src/ingest/engine_tests.py b/templates/python-tests/src/ingest/engine_tests.py index 978f590c7..12411b830 100644 --- a/templates/python-tests/src/ingest/engine_tests.py +++ b/templates/python-tests/src/ingest/engine_tests.py @@ -1,7 +1,7 @@ # Test all supported ClickHouse engines to ensure proper configuration # These tables verify that all engine types can be created and configured correctly -from moose_lib import OlapTable, OlapConfig, Key, ClickHouseTTL, clickhouse_default, FixedString +from moose_lib import OlapTable, OlapConfig, Key, Int8, ClickHouseTTL, clickhouse_default, FixedString from moose_lib.blocks import ( MergeTreeEngine, ReplacingMergeTreeEngine, @@ -86,7 +86,7 @@ class FixedStringTestData(BaseModel): class CollapsingTestData(EngineTestData): """Test data model for CollapsingMergeTree and VersionedCollapsingMergeTree testing""" - sign: int # For CollapsingMergeTree (1 = state, -1 = cancel) + sign: Int8 # For CollapsingMergeTree (1 = state, -1 = cancel) class EngineTestDataSample(BaseModel): """Test data model for engine testing""" diff --git a/templates/typescript-tests/src/ingest/engineTests.ts b/templates/typescript-tests/src/ingest/engineTests.ts index 1bb0af3a4..2615e08dd 100644 --- a/templates/typescript-tests/src/ingest/engineTests.ts +++ b/templates/typescript-tests/src/ingest/engineTests.ts @@ -3,6 +3,7 @@ import { ClickHouseEngines, Key, DateTime, + Int8, ClickHouseTTL, ClickHouseDefault, UInt32, @@ -26,7 +27,7 @@ export interface EngineTestData { // Test data model for CollapsingMergeTree and VersionedCollapsingMergeTree testing export interface CollapsingTestData extends EngineTestData { - sign: number; // For CollapsingMergeTree (1 = state, -1 = cancel) + sign: Int8; // For CollapsingMergeTree (1 = state, -1 = cancel) } export interface EngineTestDataSample { From 4b57669d5cb02abae31905f196a55b09dd70824f Mon Sep 17 00:00:00 2001 From: George Leung Date: Wed, 3 Dec 2025 17:25:09 -0800 Subject: [PATCH 6/9] fixes --- .../core/partial_infrastructure_map.rs | 12 +- .../src/framework/python/generate.rs | 4 +- .../src/framework/typescript/generate.rs | 4 +- .../llm-docs/python/table-setup.md | 6 + .../llm-docs/typescript/table-setup.md | 28 +++++ .../src/pages/moose/olap/model-table.mdx | 117 ++++++++++++++++++ packages/py-moose-lib/moose_lib/blocks.py | 16 +-- packages/py-moose-lib/moose_lib/internal.py | 8 +- packages/ts-moose-lib/src/dmv2/internal.ts | 18 ++- .../python-tests/src/ingest/engine_tests.py | 4 +- 10 files changed, 187 insertions(+), 30 deletions(-) diff --git a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs index cd954cd6b..1481a671b 100644 --- a/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/partial_infrastructure_map.rs @@ -193,7 +193,7 @@ enum EngineConfig { CollapsingMergeTree { sign: String }, #[serde(rename = "VersionedCollapsingMergeTree")] - VersionedCollapsingMergeTree { sign: String, version: String }, + VersionedCollapsingMergeTree { sign: String, ver: String }, #[serde(rename = "ReplicatedMergeTree")] ReplicatedMergeTree { @@ -249,7 +249,7 @@ enum EngineConfig { #[serde(alias = "replicaName", default)] replica_name: Option, sign: String, - version: String, + ver: String, }, #[serde(rename = "S3Queue")] @@ -820,10 +820,10 @@ impl PartialInfrastructureMap { Ok(ClickhouseEngine::CollapsingMergeTree { sign: sign.clone() }) } - Some(EngineConfig::VersionedCollapsingMergeTree { sign, version }) => { + Some(EngineConfig::VersionedCollapsingMergeTree { sign, ver }) => { Ok(ClickhouseEngine::VersionedCollapsingMergeTree { sign: sign.clone(), - version: version.clone(), + version: ver.clone(), }) } @@ -879,12 +879,12 @@ impl PartialInfrastructureMap { keeper_path, replica_name, sign, - version, + ver, }) => Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { keeper_path: keeper_path.clone(), replica_name: replica_name.clone(), sign: sign.clone(), - version: version.clone(), + version: ver.clone(), }), Some(EngineConfig::S3Queue(config)) => { diff --git a/apps/framework-cli/src/framework/python/generate.rs b/apps/framework-cli/src/framework/python/generate.rs index e95f52ea7..11e421e7d 100644 --- a/apps/framework-cli/src/framework/python/generate.rs +++ b/apps/framework-cli/src/framework/python/generate.rs @@ -823,7 +823,7 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri writeln!(output, " engine=CollapsingMergeTreeEngine(sign={:?}),", sign).unwrap(); } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { - writeln!(output, " engine=VersionedCollapsingMergeTreeEngine(sign={:?}, version={:?}),", sign, version).unwrap(); + writeln!(output, " engine=VersionedCollapsingMergeTreeEngine(sign={:?}, ver={:?}),", sign, version).unwrap(); } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedMergeTree { keeper_path, @@ -909,7 +909,7 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri params.push(format!("keeper_path={:?}, replica_name={:?}", path, name)); } params.push(format!("sign={:?}", sign)); - params.push(format!("version={:?}", version)); + params.push(format!("ver={:?}", version)); write!(output, "{}", params.join(", ")).unwrap(); writeln!(output, "),").unwrap(); } diff --git a/apps/framework-cli/src/framework/typescript/generate.rs b/apps/framework-cli/src/framework/typescript/generate.rs index 7f61434f9..7f5d3f810 100644 --- a/apps/framework-cli/src/framework/typescript/generate.rs +++ b/apps/framework-cli/src/framework/typescript/generate.rs @@ -738,7 +738,7 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::VersionedCollapsingMergeTree { sign, version } => { writeln!(output, " engine: ClickHouseEngines.VersionedCollapsingMergeTree,").unwrap(); writeln!(output, " sign: {:?},", sign).unwrap(); - writeln!(output, " version: {:?},", version).unwrap(); + writeln!(output, " ver: {:?},", version).unwrap(); } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplicatedMergeTree { keeper_path, replica_name } => { writeln!(output, " engine: ClickHouseEngines.ReplicatedMergeTree,").unwrap(); @@ -795,7 +795,7 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> writeln!(output, " replicaName: {:?},", name).unwrap(); } writeln!(output, " sign: {:?},", sign).unwrap(); - writeln!(output, " version: {:?},", version).unwrap(); + writeln!(output, " ver: {:?},", version).unwrap(); } crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::S3 { path, diff --git a/apps/framework-docs/llm-docs/python/table-setup.md b/apps/framework-docs/llm-docs/python/table-setup.md index a85d634ad..523124d42 100644 --- a/apps/framework-docs/llm-docs/python/table-setup.md +++ b/apps/framework-docs/llm-docs/python/table-setup.md @@ -89,10 +89,14 @@ from moose_lib.blocks import ( ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, + CollapsingMergeTreeEngine, + VersionedCollapsingMergeTreeEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, + ReplicatedCollapsingMergeTreeEngine, + ReplicatedVersionedCollapsingMergeTreeEngine, S3QueueEngine ) from pydantic import BaseModel @@ -403,6 +407,8 @@ Available replicated engines: - `ReplicatedReplacingMergeTreeEngine` - Replicated with deduplication - `ReplicatedAggregatingMergeTreeEngine` - Replicated with aggregation - `ReplicatedSummingMergeTreeEngine` - Replicated with summation +- `ReplicatedCollapsingMergeTreeEngine` - Replicated with state collapsing +- `ReplicatedVersionedCollapsingMergeTreeEngine` - Replicated with versioned collapsing ### Cluster-Aware Replicated Tables diff --git a/apps/framework-docs/llm-docs/typescript/table-setup.md b/apps/framework-docs/llm-docs/typescript/table-setup.md index da469d95e..78a6518c3 100644 --- a/apps/framework-docs/llm-docs/typescript/table-setup.md +++ b/apps/framework-docs/llm-docs/typescript/table-setup.md @@ -83,6 +83,34 @@ type OlapConfig = orderByFields?: (keyof T & string)[]; settings?: { [key: string]: string }; } + | { + engine: ClickHouseEngines.CollapsingMergeTree; + sign: keyof T & string; // Required: sign column (Int8: 1 = state, -1 = cancel) + orderByFields?: (keyof T & string)[]; + settings?: { [key: string]: string }; + } + | { + engine: ClickHouseEngines.VersionedCollapsingMergeTree; + sign: keyof T & string; // Required: sign column (Int8) + ver: keyof T & string; // Required: version column for ordering + orderByFields?: (keyof T & string)[]; + settings?: { [key: string]: string }; + } + | { + engine: ClickHouseEngines.ReplicatedCollapsingMergeTree; + keeperPath?: string; + replicaName?: string; + sign: keyof T & string; + orderByFields?: (keyof T & string)[]; + } + | { + engine: ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree; + keeperPath?: string; + replicaName?: string; + sign: keyof T & string; + ver: keyof T & string; + orderByFields?: (keyof T & string)[]; + } | { engine: ClickHouseEngines.S3Queue; s3Path: string; // S3 bucket path diff --git a/apps/framework-docs/src/pages/moose/olap/model-table.mdx b/apps/framework-docs/src/pages/moose/olap/model-table.mdx index 19b61e2eb..b28412de5 100644 --- a/apps/framework-docs/src/pages/moose/olap/model-table.mdx +++ b/apps/framework-docs/src/pages/moose/olap/model-table.mdx @@ -680,6 +680,121 @@ ClickHouse's ReplacingMergeTree engine runs deduplication in the background AFTE For more details, see the [ClickHouse documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree). +#### Collapsing State Changes (`CollapsingMergeTree`) +Use the `CollapsingMergeTree` engine to efficiently track object state changes by collapsing insert/cancel row pairs: + + +```ts filename="CollapsingTable.ts" copy +import { OlapTable, ClickHouseEngines, Int8 } from "@514labs/moose-lib"; + +interface UserActivity { + userId: Key; + pageViews: number; + duration: number; + sign: Int8; // Required: 1 = state row, -1 = cancel row +} + +// Track user activity with state collapsing +const activityTable = new OlapTable("user_activity", { + engine: ClickHouseEngines.CollapsingMergeTree, + sign: "sign", + orderByFields: ["userId"] +}); +``` + + + +```py filename="CollapsingTable.py" copy +from moose_lib import Key, Int8, OlapTable, OlapConfig +from moose_lib.blocks import CollapsingMergeTreeEngine +from pydantic import BaseModel + +class UserActivity(BaseModel): + user_id: Key[str] + page_views: int + duration: int + sign: Int8 # Required: 1 = state row, -1 = cancel row + +# Track user activity with state collapsing +activity_table = OlapTable[UserActivity]("user_activity", OlapConfig( + order_by_fields=["user_id"], + engine=CollapsingMergeTreeEngine(sign="sign") +)) +``` + + + +To update a record, insert two rows: +1. A cancel row (`sign = -1`) with the old values +2. A state row (`sign = 1`) with the new values + +During background merges, ClickHouse collapses matching pairs, keeping only the latest state. + +**Important**: Use aggregation in queries to get accurate results: +```sql +SELECT userId, sum(pageViews * sign) as pageViews +FROM user_activity +GROUP BY userId +HAVING sum(sign) > 0 +``` + + +#### Versioned Collapsing (`VersionedCollapsingMergeTree`) +Use `VersionedCollapsingMergeTree` when you need collapsing with out-of-order inserts. The version column ensures correct collapse ordering regardless of insertion order: + + +```ts filename="VersionedCollapsingTable.ts" copy +import { OlapTable, ClickHouseEngines, Int8 } from "@514labs/moose-lib"; + +interface UserState { + userId: Key; + pageViews: number; + duration: number; + sign: Int8; // Required: 1 = state row, -1 = cancel row + version: number; // Required: version for ordering state changes +} + +// Track user state with versioned collapsing +const stateTable = new OlapTable("user_state", { + engine: ClickHouseEngines.VersionedCollapsingMergeTree, + sign: "sign", + ver: "version", + orderByFields: ["userId"] +}); +``` + + + +```py filename="VersionedCollapsingTable.py" copy +from moose_lib import Key, Int8, OlapTable, OlapConfig +from moose_lib.blocks import VersionedCollapsingMergeTreeEngine +from pydantic import BaseModel + +class UserState(BaseModel): + user_id: Key[str] + page_views: int + duration: int + sign: Int8 # Required: 1 = state row, -1 = cancel row + version: int # Required: version for ordering state changes + +# Track user state with versioned collapsing +state_table = OlapTable[UserState]("user_state", OlapConfig( + order_by_fields=["user_id"], + engine=VersionedCollapsingMergeTreeEngine(sign="sign", ver="version") +)) +``` + + + +**CollapsingMergeTree**: Requires strictly consecutive insertion order. Use when you control insertion order (e.g., single writer). + +**VersionedCollapsingMergeTree**: Uses a version column to handle out-of-order inserts. Use when multiple threads/sources insert data or order isn't guaranteed. + +Both engines require the `sign` column to be `Int8` type. + +For more details, see the ClickHouse documentation on [CollapsingMergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/collapsingmergetree) and [VersionedCollapsingMergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/versionedcollapsingmergetree). + + #### Streaming from S3 (`S3Queue`) Use the `S3Queue` engine to automatically ingest data from S3 buckets as files are added: @@ -1018,6 +1133,8 @@ Replicated engines provide high availability and data replication across multipl - `ReplicatedReplacingMergeTree` - Replicated with deduplication - `ReplicatedAggregatingMergeTree` - Replicated with aggregation - `ReplicatedSummingMergeTree` - Replicated with summation +- `ReplicatedCollapsingMergeTree` - Replicated with state collapsing +- `ReplicatedVersionedCollapsingMergeTree` - Replicated with versioned collapsing ```ts filename="ReplicatedEngines.ts" copy diff --git a/packages/py-moose-lib/moose_lib/blocks.py b/packages/py-moose-lib/moose_lib/blocks.py index ed1c72dd1..f6443cfde 100644 --- a/packages/py-moose-lib/moose_lib/blocks.py +++ b/packages/py-moose-lib/moose_lib/blocks.py @@ -85,16 +85,16 @@ class VersionedCollapsingMergeTreeEngine(EngineConfig): Args: sign: Column name indicating row type (1 = state, -1 = cancel) - version: Column name for object state versioning + ver: Column name for object state versioning """ sign: str - version: str + ver: str def __post_init__(self): if not self.sign: raise ValueError("sign column is required for VersionedCollapsingMergeTree") - if not self.version: - raise ValueError("version column is required for VersionedCollapsingMergeTree") + if not self.ver: + raise ValueError("ver column is required for VersionedCollapsingMergeTree") @dataclass class ReplicatedMergeTreeEngine(EngineConfig): @@ -218,14 +218,14 @@ class ReplicatedVersionedCollapsingMergeTreeEngine(EngineConfig): replica_name: Replica name (e.g., '{replica}') Optional: omit for ClickHouse Cloud which manages replication automatically sign: Column name indicating row type (1 = state, -1 = cancel) - version: Column name for object state versioning + ver: Column name for object state versioning Note: Both keeper_path and replica_name must be provided together, or both omitted. """ keeper_path: Optional[str] = None replica_name: Optional[str] = None sign: str = field(default=None) - version: str = field(default=None) + ver: str = field(default=None) def __post_init__(self): # Both must be provided or both must be None @@ -233,8 +233,8 @@ def __post_init__(self): raise ValueError("keeper_path and replica_name must both be provided or both be None") if not self.sign: raise ValueError("sign column is required for ReplicatedVersionedCollapsingMergeTree") - if not self.version: - raise ValueError("version column is required for ReplicatedVersionedCollapsingMergeTree") + if not self.ver: + raise ValueError("ver column is required for ReplicatedVersionedCollapsingMergeTree") @dataclass class S3QueueEngine(EngineConfig): diff --git a/packages/py-moose-lib/moose_lib/internal.py b/packages/py-moose-lib/moose_lib/internal.py index 68c08773d..ea016c77f 100644 --- a/packages/py-moose-lib/moose_lib/internal.py +++ b/packages/py-moose-lib/moose_lib/internal.py @@ -96,7 +96,7 @@ class VersionedCollapsingMergeTreeConfigDict(BaseEngineConfigDict): """Configuration for VersionedCollapsingMergeTree engine.""" engine: Literal["VersionedCollapsingMergeTree"] = "VersionedCollapsingMergeTree" sign: str - version: str + ver: str class ReplicatedMergeTreeConfigDict(BaseEngineConfigDict): @@ -144,7 +144,7 @@ class ReplicatedVersionedCollapsingMergeTreeConfigDict(BaseEngineConfigDict): keeper_path: Optional[str] = None replica_name: Optional[str] = None sign: str - version: str + ver: str class S3QueueConfigDict(BaseEngineConfigDict): @@ -511,7 +511,7 @@ def _convert_basic_engine_instance(engine: "EngineConfig") -> Optional[EngineCon elif isinstance(engine, VersionedCollapsingMergeTreeEngine): return VersionedCollapsingMergeTreeConfigDict( sign=engine.sign, - version=engine.version + ver=engine.ver ) return None @@ -565,7 +565,7 @@ def _convert_replicated_engine_instance(engine: "EngineConfig") -> Optional[Engi keeper_path=engine.keeper_path, replica_name=engine.replica_name, sign=engine.sign, - version=engine.version + ver=engine.ver ) return None diff --git a/packages/ts-moose-lib/src/dmv2/internal.ts b/packages/ts-moose-lib/src/dmv2/internal.ts index c4a9341ee..335c1014e 100644 --- a/packages/ts-moose-lib/src/dmv2/internal.ts +++ b/packages/ts-moose-lib/src/dmv2/internal.ts @@ -25,6 +25,8 @@ import { ReplicatedReplacingMergeTreeConfig, ReplicatedAggregatingMergeTreeConfig, ReplicatedSummingMergeTreeConfig, + ReplicatedCollapsingMergeTreeConfig, + ReplicatedVersionedCollapsingMergeTreeConfig, S3QueueConfig, } from "./sdk/olapTable"; import { @@ -105,7 +107,7 @@ interface CollapsingMergeTreeEngineConfig { interface VersionedCollapsingMergeTreeEngineConfig { engine: "VersionedCollapsingMergeTree"; sign: string; - version: string; // Note: This is the version column name, not table version + ver: string; } interface ReplicatedMergeTreeEngineConfig { @@ -147,7 +149,7 @@ interface ReplicatedVersionedCollapsingMergeTreeEngineConfig { keeperPath?: string; replicaName?: string; sign: string; - version: string; // Note: This is the version column name, not table version + ver: string; } interface S3QueueEngineConfig { @@ -431,7 +433,9 @@ function hasReplicatedEngine( | ReplicatedMergeTreeConfig | ReplicatedReplacingMergeTreeConfig | ReplicatedAggregatingMergeTreeConfig - | ReplicatedSummingMergeTreeConfig { + | ReplicatedSummingMergeTreeConfig + | ReplicatedCollapsingMergeTreeConfig + | ReplicatedVersionedCollapsingMergeTreeConfig { if (!("engine" in config)) { return false; } @@ -442,7 +446,9 @@ function hasReplicatedEngine( engine === ClickHouseEngines.ReplicatedMergeTree || engine === ClickHouseEngines.ReplicatedReplacingMergeTree || engine === ClickHouseEngines.ReplicatedAggregatingMergeTree || - engine === ClickHouseEngines.ReplicatedSummingMergeTree + engine === ClickHouseEngines.ReplicatedSummingMergeTree || + engine === ClickHouseEngines.ReplicatedCollapsingMergeTree || + engine === ClickHouseEngines.ReplicatedVersionedCollapsingMergeTree ); } @@ -503,7 +509,7 @@ function convertBasicEngineConfig( return { engine: "VersionedCollapsingMergeTree", sign: versionedConfig.sign, - version: versionedConfig.ver, + ver: versionedConfig.ver, }; } @@ -583,7 +589,7 @@ function convertReplicatedEngineConfig( keeperPath: replicatedConfig.keeperPath, replicaName: replicatedConfig.replicaName, sign: replicatedConfig.sign, - version: replicatedConfig.ver, + ver: replicatedConfig.ver, }; } diff --git a/templates/python-tests/src/ingest/engine_tests.py b/templates/python-tests/src/ingest/engine_tests.py index 12411b830..59cd585eb 100644 --- a/templates/python-tests/src/ingest/engine_tests.py +++ b/templates/python-tests/src/ingest/engine_tests.py @@ -174,7 +174,7 @@ class EngineTestDataSample(BaseModel): versioned_collapsing_merge_tree_table = OlapTable[CollapsingTestData]( "VersionedCollapsingMergeTreeTest", OlapConfig( - engine=VersionedCollapsingMergeTreeEngine(sign="sign", version="version"), + engine=VersionedCollapsingMergeTreeEngine(sign="sign", ver="version"), order_by_fields=["id", "timestamp"] ) ) @@ -275,7 +275,7 @@ class EngineTestDataSample(BaseModel): keeper_path="/clickhouse/tables/{database}/{shard}/replicated_versioned_collapsing_test", replica_name="{replica}", sign="sign", - version="version" + ver="version" ), order_by_fields=["id", "timestamp"] ) From b47e0f5b9a66b39fa09bfbc4c745b3a4b3a8723d Mon Sep 17 00:00:00 2001 From: George Leung Date: Wed, 3 Dec 2025 17:33:52 -0800 Subject: [PATCH 7/9] stuff --- packages/py-moose-lib/moose_lib/blocks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/py-moose-lib/moose_lib/blocks.py b/packages/py-moose-lib/moose_lib/blocks.py index f6443cfde..1e37fe900 100644 --- a/packages/py-moose-lib/moose_lib/blocks.py +++ b/packages/py-moose-lib/moose_lib/blocks.py @@ -22,6 +22,8 @@ class ClickHouseEngines(Enum): ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree" ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree" ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree" + ReplicatedCollapsingMergeTree = "ReplicatedCollapsingMergeTree" + ReplicatedVersionedCollapsingMergeTree = "ReplicatedVersionedCollapsingMergeTree" # ========================== # New Engine Configuration Classes From f301c8fa7b27f570cc60cbaee5c5a6bf30e19d95 Mon Sep 17 00:00:00 2001 From: George Leung Date: Wed, 3 Dec 2025 18:12:16 -0800 Subject: [PATCH 8/9] clean up --- .../infrastructure/olap/clickhouse/queries.rs | 50 ++++++------------- 1 file changed, 14 insertions(+), 36 deletions(-) diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs index 93b3f0c3f..a26468a33 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -592,12 +592,7 @@ impl ClickhouseEngine { } // First two params are keeper_path and replica_name - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); - - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); // Optional 3rd param is ver, optional 4th is is_deleted let ver = params.get(2).cloned(); @@ -646,6 +641,14 @@ impl ClickhouseEngine { } } + /// Extract and normalize replication params from parsed CSV parameters + /// Returns normalized (keeper_path, replica_name) tuple + fn extract_replication_params(params: &[String]) -> (Option, Option) { + let keeper_path = params.first().cloned(); + let replica_name = params.get(1).cloned(); + Self::normalize_replication_params(keeper_path, replica_name) + } + /// Parse SharedMergeTree or ReplicatedMergeTree /// Format: (path, replica) or () for automatic configuration fn parse_distributed_merge_tree(value: &str) -> Result { @@ -672,12 +675,7 @@ impl ClickhouseEngine { } // First two params are keeper_path and replica_name - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); - - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); Ok(ClickhouseEngine::ReplicatedMergeTree { keeper_path, @@ -725,12 +723,7 @@ impl ClickhouseEngine { } // First two params are keeper_path and replica_name - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); - - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); Ok(ClickhouseEngine::ReplicatedAggregatingMergeTree { keeper_path, @@ -776,12 +769,7 @@ impl ClickhouseEngine { } // First two params are keeper_path and replica_name - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); - - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); // Additional params are column names (if any) let columns = if params.len() > 2 { @@ -854,14 +842,9 @@ impl ClickhouseEngine { } // Full parameters: keeper_path, replica_name, sign - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); let sign = params[2].clone(); - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); - Ok(ClickhouseEngine::ReplicatedCollapsingMergeTree { keeper_path, replica_name, @@ -920,15 +903,10 @@ impl ClickhouseEngine { } // Full parameters: keeper_path, replica_name, sign, version - let keeper_path = params.first().cloned(); - let replica_name = params.get(1).cloned(); + let (keeper_path, replica_name) = Self::extract_replication_params(¶ms); let sign = params[2].clone(); let version = params[3].clone(); - // Normalize defaults back to None - let (keeper_path, replica_name) = - Self::normalize_replication_params(keeper_path, replica_name); - Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree { keeper_path, replica_name, From 0676e94d66b338c8ad611df5a96f0877e5fd11ab Mon Sep 17 00:00:00 2001 From: George Leung Date: Wed, 3 Dec 2025 18:51:42 -0800 Subject: [PATCH 9/9] ouch --- apps/framework-cli/src/framework/python/generate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/framework-cli/src/framework/python/generate.rs b/apps/framework-cli/src/framework/python/generate.rs index 11e421e7d..9029d539e 100644 --- a/apps/framework-cli/src/framework/python/generate.rs +++ b/apps/framework-cli/src/framework/python/generate.rs @@ -562,7 +562,7 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri .unwrap(); writeln!( output, - "from moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, S3QueueEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine" + "from moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, CollapsingMergeTreeEngine, VersionedCollapsingMergeTreeEngine, S3QueueEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, ReplicatedCollapsingMergeTreeEngine, ReplicatedVersionedCollapsingMergeTreeEngine" ) .unwrap(); writeln!(output).unwrap(); @@ -1157,7 +1157,7 @@ from moose_lib import Key, IngestPipeline, IngestPipelineConfig, OlapTable, Olap from moose_lib.data_models import ClickHouseJson from moose_lib import Point, Ring, LineString, MultiLineString, Polygon, MultiPolygon, FixedString from moose_lib import clickhouse_default, ClickHouseCodec, ClickHouseMaterialized, LifeCycle, ClickHouseTTL -from moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, S3QueueEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine +from moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, CollapsingMergeTreeEngine, VersionedCollapsingMergeTreeEngine, S3QueueEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, ReplicatedCollapsingMergeTreeEngine, ReplicatedVersionedCollapsingMergeTreeEngine class Foo(BaseModel): primary_key: Key[str]