Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions apps/framework-cli-e2e/test/utils/schema-definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ export const TYPESCRIPT_TEST_SCHEMAS: ExpectedTableSchema[] = [
{ name: "isDeleted", type: "Bool" },
],
},
{
tableName: "NullEngineTest",
columns: [
{ name: "id", type: "String" },
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
{ name: "value", type: "Float64" },
{ name: "category", type: "String" },
{ name: "version", type: "Float64" },
{ name: "isDeleted", type: "Bool" },
],
engine: "Null",
},
{
tableName: "MergeTreeTestExpr",
columns: [
Expand Down Expand Up @@ -458,6 +470,18 @@ export const PYTHON_TEST_SCHEMAS: ExpectedTableSchema[] = [
{ name: "is_deleted", type: "Bool" },
],
},
{
tableName: "NullEngineTest",
columns: [
{ name: "id", type: "String" },
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
{ name: "value", type: "Int64" },
{ name: "category", type: "String" },
{ name: "version", type: "Int64" },
{ name: "is_deleted", type: "Bool" },
],
engine: "Null",
},
{
tableName: "MergeTreeTestExpr",
columns: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ struct DistributedConfig {
#[derive(Debug, Deserialize)]
#[serde(tag = "engine", rename_all = "camelCase")]
enum EngineConfig {
#[serde(rename = "Null")]
Null {},

#[serde(rename = "MergeTree")]
MergeTree {},

Expand Down Expand Up @@ -747,6 +750,8 @@ impl PartialInfrastructureMap {
partial_table: &PartialTable,
) -> Result<ClickhouseEngine, DmV2LoadingError> {
match &partial_table.engine_config {
Some(EngineConfig::Null {}) => Ok(ClickhouseEngine::Null),

Some(EngineConfig::MergeTree {}) => Ok(ClickhouseEngine::MergeTree),

Some(EngineConfig::ReplacingMergeTree { ver, is_deleted }) => {
Expand Down
4 changes: 4 additions & 0 deletions apps/framework-cli/src/framework/python/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option<LifeCycle>) -> Stri
crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::MergeTree => {
writeln!(output, " engine=MergeTreeEngine(),").unwrap();
}
crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::Null => {
writeln!(output, " engine=NullEngine(),").unwrap();
}

crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::ReplacingMergeTree { ver, is_deleted } => {
// Emit ReplacingMergeTreeEngine with parameters if present
write!(output, " engine=ReplacingMergeTreeEngine(").unwrap();
Expand Down
5 changes: 5 additions & 0 deletions apps/framework-cli/src/framework/typescript/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,11 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option<LifeCycle>) ->
writeln!(output, " }},").unwrap();
}
}
crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::Null => {
// Table with engine Null : we expose the Null engine in TS
// (assuming you have ClickHouseEngines.Null in your TS enum)
writeln!(output, " engine: ClickHouseEngines.Null,").unwrap();
}
crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine::MergeTree => {
writeln!(output, " engine: ClickHouseEngines.MergeTree,").unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,16 @@ impl ClickHouseTableDiffStrategy {
// Skip population in production (user must handle manually)
// Only populate in dev for new MVs with non-S3Queue sources
if is_new && !has_s3queue_source && !is_production {
// If the target table uses the Null engine, skip truncation because there's nothing to truncate
let target_is_null_engine = tables.values().any(|table| {
matches!(table.engine, ClickhouseEngine::Null)
&& Self::table_matches_mv_target(
table,
&mv_stmt.target_table,
&mv_stmt.target_database,
)
});

log::info!(
"Adding population operation for materialized view '{}'",
sql_resource.name
Expand All @@ -408,7 +418,7 @@ impl ClickHouseTableDiffStrategy {
.into_iter()
.map(|t| t.qualified_name())
.collect(),
should_truncate: true,
should_truncate: !target_is_null_engine,
});
}

Expand All @@ -417,6 +427,30 @@ impl ClickHouseTableDiffStrategy {
}
}
}

/// Helper: does the given table correspond to the MV target (by name and optional database)?
fn table_matches_mv_target(
table: &Table,
target_table: &str,
target_database: &Option<String>,
) -> bool {
if table.name == target_table {
return true;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Table matching ignores database when names match

The table_matches_mv_target function returns true when table names match without checking if databases also match. The early return at line 437-439 (if table.name == target_table { return true; }) bypasses all database checks. When a Null engine table exists with the same name as the MV target but in a different database, this incorrectly identifies it as the target. This causes target_is_null_engine to be true and should_truncate to be set to false when it should be true, potentially leading to incorrect materialized view population behavior.

Fix in Cursor Fix in Web


match (&table.database, target_database) {
(Some(table_db), Some(target_db)) => {
table_db == target_db && table.name == target_table
|| format!("{table_db}.{}", table.name) == format!("{target_db}.{target_table}")
}
(Some(table_db), None) => table_db.is_empty() && table.name == target_table,
(None, Some(target_db)) => {
format!("{target_db}.{target_table}") == table.name
|| format!("{target_db}.{}", table.name) == target_table
}
_ => false,
}
}
}

impl TableDiffStrategy for ClickHouseTableDiffStrategy {
Expand Down
23 changes: 23 additions & 0 deletions apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl BufferEngine {
pub enum ClickhouseEngine {
#[default]
MergeTree,
Null,
ReplacingMergeTree {
// Optional version column for deduplication
ver: Option<String>,
Expand Down Expand Up @@ -330,6 +331,7 @@ pub enum ClickhouseEngine {
impl Into<String> for ClickhouseEngine {
fn into(self) -> String {
match self {
ClickhouseEngine::Null => "Null".to_string(),
ClickhouseEngine::MergeTree => "MergeTree".to_string(),
ClickhouseEngine::ReplacingMergeTree { ver, is_deleted } => {
Self::serialize_replacing_merge_tree(&ver, &is_deleted)
Expand Down Expand Up @@ -422,6 +424,11 @@ impl<'a> TryFrom<&'a str> for ClickhouseEngine {
type Error = &'a str;

fn try_from(value: &'a str) -> Result<Self, &'a str> {
// Try to parse Null engine
if value.eq_ignore_ascii_case("Null") {
return Ok(ClickhouseEngine::Null);
}

// Try to parse distributed variants first (SharedMergeTree, ReplicatedMergeTree)
if let Some(engine) = Self::try_parse_distributed_engine(value) {
return engine;
Expand Down Expand Up @@ -775,6 +782,7 @@ impl ClickhouseEngine {
let engine_name = value.strip_prefix("Shared").unwrap_or(value);

match engine_name {
"Null" => Ok(ClickhouseEngine::Null),
"MergeTree" => Ok(ClickhouseEngine::MergeTree),
"ReplacingMergeTree" => Ok(ClickhouseEngine::ReplacingMergeTree {
ver: None,
Expand Down Expand Up @@ -1050,6 +1058,7 @@ impl ClickhouseEngine {
/// Convert engine to string for proto storage (no sensitive data)
pub fn to_proto_string(&self) -> String {
match self {
ClickhouseEngine::Null => "Null".to_string(),
ClickhouseEngine::MergeTree => "MergeTree".to_string(),
ClickhouseEngine::ReplacingMergeTree { ver, is_deleted } => {
Self::serialize_replacing_merge_tree(ver, is_deleted)
Expand Down Expand Up @@ -1745,6 +1754,9 @@ impl ClickhouseEngine {
// Without hashing "null", both would produce identical hashes.

match self {
ClickhouseEngine::Null => {
hasher.update("Null".as_bytes());
}
ClickhouseEngine::MergeTree => {
hasher.update("MergeTree".as_bytes());
}
Expand Down Expand Up @@ -2249,6 +2261,7 @@ pub fn create_table_query(
reg.register_escape_fn(no_escape);

let engine = match &table.engine {
ClickhouseEngine::Null => "Null".to_string(),
ClickhouseEngine::MergeTree => "MergeTree".to_string(),
ClickhouseEngine::ReplacingMergeTree { ver, is_deleted } => build_replacing_merge_tree_ddl(
ver,
Expand Down Expand Up @@ -4927,6 +4940,16 @@ ENGINE = S3Queue('s3://my-bucket/data/*.csv', NOSIGN, 'CSV')"#;
}
}

#[test]
fn test_null_engine_roundtrip() {
let engine_str = "Null";
let engine: ClickhouseEngine = engine_str.try_into().unwrap();
assert!(matches!(engine, ClickhouseEngine::Null));

let serialized: String = engine.into();
assert_eq!(serialized, "Null");
}

#[test]
fn test_buffer_engine_round_trip() {
// Test Buffer engine with all parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1557,4 +1557,20 @@ pub mod tests {
let indexes = extract_indexes_from_create_table(NESTED_OBJECTS_SQL).unwrap();
assert_eq!(indexes.len(), 0);
}

#[test]
fn test_extract_null_engine() {
// Test for engine = Null
let sql = "CREATE TABLE test (x Int32) ENGINE = Null";
let result = extract_engine_from_create_table(sql);
assert_eq!(result, Some("Null".to_string()));
}

#[test]
fn test_extract_null_engine_lowercase() {
// Test for engine = null (lowercase)
let sql = "create table test (x Int32) engine = null";
let result = extract_engine_from_create_table(sql);
assert_eq!(result, Some("null".to_string()));
}
}
12 changes: 9 additions & 3 deletions packages/py-moose-lib/moose_lib/blocks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import warnings
from abc import ABC
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Any, Union
from abc import ABC
import warnings
from typing import Any, Dict, List, Optional, Union


class ClickHouseEngines(Enum):
Null = "Null"
MergeTree = "MergeTree"
ReplacingMergeTree = "ReplacingMergeTree"
SummingMergeTree = "SummingMergeTree"
Expand All @@ -31,6 +32,11 @@ class EngineConfig(ABC):
"""Base class for engine configurations"""
pass

@dataclass
class NullEngine(EngineConfig):
"""Configuration for Null engine"""
pass

@dataclass
class MergeTreeEngine(EngineConfig):
"""Configuration for MergeTree engine"""
Expand Down
36 changes: 27 additions & 9 deletions packages/py-moose-lib/moose_lib/dmv2/olap_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,32 @@
"""
import json
import warnings
from dataclasses import dataclass
from typing import (
Any,
Generic,
Iterator,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
)

from clickhouse_connect import get_client
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.exceptions import ClickHouseError
from dataclasses import dataclass
from pydantic import BaseModel
from typing import List, Optional, Any, Literal, Union, Tuple, TypeVar, Generic, Iterator

from ..blocks import ClickHouseEngines, EngineConfig
from ..commons import Logger
from ..config.runtime import RuntimeClickHouseConfig
from ..data_models import Column, _to_columns, is_array_nested_type, is_nested_type
from ..utilities.sql import quote_identifier
from .types import TypedMooseResource, T, Cols
from ._registry import _tables
from ..data_models import Column, is_array_nested_type, is_nested_type, _to_columns
from .life_cycle import LifeCycle
from .types import Cols, T, TypedMooseResource


@dataclass
Expand Down Expand Up @@ -159,11 +171,17 @@ def model_post_init(self, __context):

# Validate that non-MergeTree engines don't have unsupported clauses
if self.engine:
from ..blocks import S3Engine, S3QueueEngine, BufferEngine, DistributedEngine
from ..blocks import (
BufferEngine,
DistributedEngine,
NullEngine,
S3Engine,
S3QueueEngine,
)

# S3QueueEngine, BufferEngine, and DistributedEngine don't support ORDER BY
# Note: S3Engine DOES support ORDER BY (unlike S3Queue)
engines_without_order_by = (S3QueueEngine, BufferEngine, DistributedEngine)
engines_without_order_by = (NullEngine, S3QueueEngine, BufferEngine, DistributedEngine)
if isinstance(self.engine, engines_without_order_by):
engine_name = type(self.engine).__name__

Expand All @@ -174,7 +192,7 @@ def model_post_init(self, __context):
)

# All non-MergeTree engines don't support SAMPLE BY
engines_without_sample_by = (S3Engine, S3QueueEngine, BufferEngine, DistributedEngine)
engines_without_sample_by = (NullEngine, S3Engine, S3QueueEngine, BufferEngine, DistributedEngine)
if isinstance(self.engine, engines_without_sample_by):
engine_name = type(self.engine).__name__

Expand All @@ -186,7 +204,7 @@ def model_post_init(self, __context):

# Only S3QueueEngine, BufferEngine, and DistributedEngine don't support PARTITION BY
# S3Engine DOES support PARTITION BY
engines_without_partition_by = (S3QueueEngine, BufferEngine, DistributedEngine)
engines_without_partition_by = (NullEngine, S3QueueEngine, BufferEngine, DistributedEngine)
if isinstance(self.engine, engines_without_partition_by):
engine_name = type(self.engine).__name__

Expand Down Expand Up @@ -237,9 +255,9 @@ def __init__(self, name: str, config: OlapConfig = OlapConfig(), **kwargs):
# Validate cluster and explicit replication params are not both specified
if config.cluster:
from moose_lib.blocks import (
ReplicatedAggregatingMergeTreeEngine,
ReplicatedMergeTreeEngine,
ReplicatedReplacingMergeTreeEngine,
ReplicatedAggregatingMergeTreeEngine,
ReplicatedSummingMergeTreeEngine,
)

Expand Down
Loading