Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions apps/framework-cli-e2e/test/collapsing-merge-tree.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
/// <reference types="node" />
/// <reference types="mocha" />
/// <reference types="chai" />

/**
* End-to-end tests for CollapsingMergeTree and VersionedCollapsingMergeTree engines.
*
* These tests verify that:
* 1. Tables using CollapsingMergeTree and VersionedCollapsingMergeTree engines are created correctly
* 2. Both regular and replicated variants work properly
* 3. The sign and version parameters are correctly passed to ClickHouse
*/

import { spawn, ChildProcess } from "child_process";
import { expect } from "chai";
import * as path from "path";

// Import constants and utilities
import {
TIMEOUTS,
TEMPLATE_NAMES,
APP_NAMES,
SERVER_CONFIG,
} from "./constants";

import {
waitForServerStart,
waitForInfrastructureReady,
waitForStreamingFunctions,
cleanupTestSuite,
performGlobalCleanup,
createTempTestDirectory,
setupTypeScriptProject,
setupPythonProject,
getTableDDL,
} from "./utils";

const CLI_PATH = path.resolve(__dirname, "../../../target/debug/moose-cli");
const MOOSE_LIB_PATH = path.resolve(
__dirname,
"../../../packages/ts-moose-lib",
);
const MOOSE_PY_LIB_PATH = path.resolve(
__dirname,
"../../../packages/py-moose-lib",
);

const TEST_PACKAGE_MANAGER = (process.env.TEST_PACKAGE_MANAGER || "npm") as
| "npm"
| "pnpm"
| "pip";

describe("CollapsingMergeTree and VersionedCollapsingMergeTree Engine Tests", function () {
describe("TypeScript Template - CollapsingMergeTree Engines", function () {
let devProcess: ChildProcess | null = null;
let testDir: string = "";
const appName = APP_NAMES.TYPESCRIPT_TESTS;

before(async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);
console.log("\n🚀 Setting up TypeScript CollapsingMergeTree test...\n");

testDir = createTempTestDirectory("ts-collapsing-mt");
console.log(`Created temporary directory: ${testDir}`);

console.log("Setting up TypeScript project...");
await setupTypeScriptProject(
testDir,
TEMPLATE_NAMES.TYPESCRIPT_TESTS,
CLI_PATH,
MOOSE_LIB_PATH,
appName,
TEST_PACKAGE_MANAGER as "npm" | "pnpm",
);

console.log("Starting dev server...");
devProcess = spawn(CLI_PATH, ["dev"], {
stdio: "pipe",
cwd: testDir,
env: process.env,
});

console.log("Waiting for server to start...");
await waitForServerStart(
devProcess,
TIMEOUTS.SERVER_STARTUP_MS,
SERVER_CONFIG.startupMessage,
SERVER_CONFIG.url,
);

console.log("Waiting for streaming functions...");
await waitForStreamingFunctions();

console.log("Waiting for infrastructure to be ready...");
await waitForInfrastructureReady();

console.log("✅ TypeScript test setup completed successfully\n");
});

after(async function () {
this.timeout(TIMEOUTS.CLEANUP_MS);
await cleanupTestSuite(devProcess, testDir, appName, {
logPrefix: "TypeScript CollapsingMergeTree test",
});
});

it("should create CollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL("CollapsingMergeTreeTest", "local");
console.log("CollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has CollapsingMergeTree engine
expect(ddl).to.include("CollapsingMergeTree");
expect(ddl).to.include("`sign`");
console.log("✅ CollapsingMergeTree table created successfully");
});

it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"VersionedCollapsingMergeTreeTest",
"local",
);
console.log("VersionedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has VersionedCollapsingMergeTree engine
expect(ddl).to.include("VersionedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
expect(ddl).to.include("`version`");
console.log("✅ VersionedCollapsingMergeTree table created successfully");
});

it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"ReplicatedCollapsingMergeTreeTest",
"local",
);
console.log("ReplicatedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has ReplicatedCollapsingMergeTree engine
expect(ddl).to.include("ReplicatedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
// Verify it has replication parameters (keeper path and replica name)
expect(ddl).to.match(
/ReplicatedCollapsingMergeTree\([^)]*replicated_collapsing_test[^)]*\)/,
);
console.log(
"✅ ReplicatedCollapsingMergeTree table created successfully",
);
});

it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"ReplicatedVersionedCollapsingMergeTreeTest",
"local",
);
console.log("ReplicatedVersionedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has ReplicatedVersionedCollapsingMergeTree engine
expect(ddl).to.include("ReplicatedVersionedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
expect(ddl).to.include("`version`");
// Verify it has replication parameters
expect(ddl).to.match(
/ReplicatedVersionedCollapsingMergeTree\([^)]*replicated_versioned_collapsing_test[^)]*\)/,
);
console.log(
"✅ ReplicatedVersionedCollapsingMergeTree table created successfully",
);
});
});

describe("Python Template - CollapsingMergeTree Engines", function () {
let devProcess: ChildProcess | null = null;
let testDir: string = "";
const appName = APP_NAMES.PYTHON_TESTS;

before(async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);
console.log("\n🚀 Setting up Python CollapsingMergeTree test...\n");

testDir = createTempTestDirectory("py-collapsing-mt");
console.log(`Created temporary directory: ${testDir}`);

console.log("Setting up Python project...");
await setupPythonProject(
testDir,
TEMPLATE_NAMES.PYTHON_TESTS,
CLI_PATH,
MOOSE_PY_LIB_PATH,
appName,
);

console.log("Starting dev server...");
devProcess = spawn(CLI_PATH, ["dev"], {
stdio: "pipe",
cwd: testDir,
env: {
...process.env,
VIRTUAL_ENV: path.join(testDir, ".venv"),
PATH: `${path.join(testDir, ".venv", "bin")}:${process.env.PATH}`,
},
});

console.log("Waiting for server to start...");
await waitForServerStart(
devProcess,
TIMEOUTS.SERVER_STARTUP_MS,
SERVER_CONFIG.startupMessage,
SERVER_CONFIG.url,
);

console.log("Waiting for streaming functions...");
await waitForStreamingFunctions();

console.log("Waiting for infrastructure to be ready...");
await waitForInfrastructureReady();

console.log("✅ Python test setup completed successfully\n");
});

after(async function () {
this.timeout(TIMEOUTS.CLEANUP_MS);
await cleanupTestSuite(devProcess, testDir, appName, {
logPrefix: "Python CollapsingMergeTree test",
});
});

it("should create CollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL("CollapsingMergeTreeTest", "local");
console.log("CollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has CollapsingMergeTree engine
expect(ddl).to.include("CollapsingMergeTree");
expect(ddl).to.include("`sign`");
console.log("✅ CollapsingMergeTree table created successfully");
});

it("should create VersionedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"VersionedCollapsingMergeTreeTest",
"local",
);
console.log("VersionedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has VersionedCollapsingMergeTree engine
expect(ddl).to.include("VersionedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
expect(ddl).to.include("`version`");
console.log("✅ VersionedCollapsingMergeTree table created successfully");
});

it("should create ReplicatedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"ReplicatedCollapsingMergeTreeTest",
"local",
);
console.log("ReplicatedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has ReplicatedCollapsingMergeTree engine
expect(ddl).to.include("ReplicatedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
// Verify it has replication parameters
expect(ddl).to.match(
/ReplicatedCollapsingMergeTree\([^)]*replicated_collapsing_test[^)]*\)/,
);
console.log(
"✅ ReplicatedCollapsingMergeTree table created successfully",
);
});

it("should create ReplicatedVersionedCollapsingMergeTree table with correct engine configuration", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const ddl = await getTableDDL(
"ReplicatedVersionedCollapsingMergeTreeTest",
"local",
);
console.log("ReplicatedVersionedCollapsingMergeTreeTest DDL:", ddl);

// Verify the table exists and has ReplicatedVersionedCollapsingMergeTree engine
expect(ddl).to.include("ReplicatedVersionedCollapsingMergeTree");
expect(ddl).to.include("`sign`");
expect(ddl).to.include("`version`");
// Verify it has replication parameters
expect(ddl).to.match(
/ReplicatedVersionedCollapsingMergeTree\([^)]*replicated_versioned_collapsing_test[^)]*\)/,
);
console.log(
"✅ ReplicatedVersionedCollapsingMergeTree table created successfully",
);
});
});

after(async function () {
this.timeout(TIMEOUTS.GLOBAL_CLEANUP_MS);
await performGlobalCleanup();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ enum EngineConfig {
columns: Option<Vec<String>>,
},

#[serde(rename = "CollapsingMergeTree")]
CollapsingMergeTree { sign: String },

#[serde(rename = "VersionedCollapsingMergeTree")]
VersionedCollapsingMergeTree { sign: String, ver: String },

#[serde(rename = "ReplicatedMergeTree")]
ReplicatedMergeTree {
#[serde(alias = "keeperPath", default)]
Expand Down Expand Up @@ -227,6 +233,25 @@ enum EngineConfig {
columns: Option<Vec<String>>,
},

#[serde(rename = "ReplicatedCollapsingMergeTree")]
ReplicatedCollapsingMergeTree {
#[serde(alias = "keeperPath", default)]
keeper_path: Option<String>,
#[serde(alias = "replicaName", default)]
replica_name: Option<String>,
sign: String,
},

#[serde(rename = "ReplicatedVersionedCollapsingMergeTree")]
ReplicatedVersionedCollapsingMergeTree {
#[serde(alias = "keeperPath", default)]
keeper_path: Option<String>,
#[serde(alias = "replicaName", default)]
replica_name: Option<String>,
sign: String,
ver: String,
},

#[serde(rename = "S3Queue")]
S3Queue(Box<S3QueueConfig>),

Expand Down Expand Up @@ -791,6 +816,17 @@ impl PartialInfrastructureMap {
})
}

Some(EngineConfig::CollapsingMergeTree { sign }) => {
Ok(ClickhouseEngine::CollapsingMergeTree { sign: sign.clone() })
}

Some(EngineConfig::VersionedCollapsingMergeTree { sign, ver }) => {
Ok(ClickhouseEngine::VersionedCollapsingMergeTree {
sign: sign.clone(),
version: ver.clone(),
})
}

Some(EngineConfig::ReplicatedMergeTree {
keeper_path,
replica_name,
Expand Down Expand Up @@ -829,6 +865,28 @@ impl PartialInfrastructureMap {
columns: columns.clone(),
}),

Some(EngineConfig::ReplicatedCollapsingMergeTree {
keeper_path,
replica_name,
sign,
}) => Ok(ClickhouseEngine::ReplicatedCollapsingMergeTree {
keeper_path: keeper_path.clone(),
replica_name: replica_name.clone(),
sign: sign.clone(),
}),

Some(EngineConfig::ReplicatedVersionedCollapsingMergeTree {
keeper_path,
replica_name,
sign,
ver,
}) => Ok(ClickhouseEngine::ReplicatedVersionedCollapsingMergeTree {
keeper_path: keeper_path.clone(),
replica_name: replica_name.clone(),
sign: sign.clone(),
version: ver.clone(),
}),

Some(EngineConfig::S3Queue(config)) => {
// Keep environment variable markers as-is - credentials will be resolved at runtime
// S3Queue settings are handled in table_settings, not in the engine
Expand Down
Loading
Loading