Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions apps/framework-cli-e2e/test/templates.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,146 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
}
});

// Index signature test for TypeScript (ENG-1617)
// Tests that IngestApi accepts payloads with extra fields when the type has an index signature.
// Extra fields are passed through to streaming functions and stored in a JSON column.
//
// KEY CONCEPTS:
// - IngestApi/Stream: CAN have index signatures (accept variable fields)
// - OlapTable: CANNOT have index signatures (ClickHouse requires fixed schema)
// - Transform: Receives ALL fields, outputs to fixed schema with JSON column for extras

it("should pass extra fields to streaming function via index signature", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const userId = randomUUID();
const timestamp = new Date().toISOString();

// Send data with known fields plus arbitrary extra fields
await withRetries(
async () => {
const response = await fetch(
`${SERVER_CONFIG.url}/ingest/userEventIngestApi`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
// Known fields defined in the type
timestamp: timestamp,
eventName: "page_view",
userId: userId,
orgId: "org-123",
// Extra fields - allowed by index signature, passed to streaming function
customProperty: "custom-value",
pageUrl: "/dashboard",
sessionDuration: 120,
nested: {
level1: "value1",
level2: { deep: "nested" },
},
}),
},
);
if (!response.ok) {
const text = await response.text();
throw new Error(`${response.status}: ${text}`);
}
},
{ attempts: 5, delayMs: 500 },
);

// Wait for the transform to process and write to output table
await waitForDBWrite(
devProcess!,
"UserEventOutput",
1,
60_000,
"local",
`userId = '${userId}'`,
);

// Verify the data was written correctly
const client = createClient(CLICKHOUSE_CONFIG);
const result = await client.query({
query: `
SELECT
userId,
eventName,
orgId,
properties
FROM local.UserEventOutput
WHERE userId = '${userId}'
`,
format: "JSONEachRow",
});

const rows: any[] = await result.json();
await client.close();

if (rows.length === 0) {
throw new Error(
`No data found in UserEventOutput for userId ${userId}`,
);
}

const row = rows[0];

// Verify known fields are correctly passed through
if (row.eventName !== "page_view") {
throw new Error(
`Expected eventName to be 'page_view', got '${row.eventName}'`,
);
}
if (row.orgId !== "org-123") {
throw new Error(
`Expected orgId to be 'org-123', got '${row.orgId}'`,
);
}

// Verify extra fields are stored in the properties JSON column
if (row.properties === undefined) {
throw new Error("Expected properties JSON column to exist");
}

// Parse properties if it's a string (ClickHouse may return JSON as string)
const properties =
typeof row.properties === "string" ?
JSON.parse(row.properties)
: row.properties;

// Verify extra fields were received by streaming function and stored in properties
if (properties.customProperty !== "custom-value") {
throw new Error(
`Expected properties.customProperty to be 'custom-value', got '${properties.customProperty}'. Properties: ${JSON.stringify(properties)}`,
);
}
if (properties.pageUrl !== "/dashboard") {
throw new Error(
`Expected properties.pageUrl to be '/dashboard', got '${properties.pageUrl}'`,
);
}
// Note: ClickHouse JSON may return numbers as strings
if (Number(properties.sessionDuration) !== 120) {
throw new Error(
`Expected properties.sessionDuration to be 120, got '${properties.sessionDuration}'`,
);
}
if (
!properties.nested ||
properties.nested.level1 !== "value1" ||
!properties.nested.level2 ||
properties.nested.level2.deep !== "nested"
) {
throw new Error(
`Expected nested object to be preserved, got '${JSON.stringify(properties.nested)}'`,
);
}

console.log(
"✅ Index signature test passed - extra fields received by streaming function and stored in properties column",
);
});

// DateTime precision test for TypeScript
it("should preserve microsecond precision with DateTime64String types via streaming transform", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);
Expand Down Expand Up @@ -1630,6 +1770,145 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
expect(apiData).to.be.an("array");
});

// Extra fields test for Python (ENG-1617)
// Tests that IngestApi accepts payloads with extra fields when the model has extra='allow'.
// Extra fields are passed through to streaming functions and stored in a JSON column.
//
// KEY CONCEPTS:
// - IngestApi/Stream with extra='allow': CAN accept variable fields
// - OlapTable: Requires fixed schema (ClickHouse needs to know columns)
// - Transform: Receives ALL fields via model_extra, outputs to fixed schema with JSON column
it("should pass extra fields to streaming function via Pydantic extra='allow' (PY)", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);

const userId = randomUUID();
const timestamp = new Date().toISOString();

// Send data with known fields plus arbitrary extra fields
await withRetries(
async () => {
const response = await fetch(
`${SERVER_CONFIG.url}/ingest/userEventIngestApi`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
// Known fields defined in the model (snake_case for Python)
timestamp: timestamp,
event_name: "page_view",
user_id: userId,
org_id: "org-123",
// Extra fields - allowed by extra='allow', passed to streaming function
customProperty: "custom-value",
pageUrl: "/dashboard",
sessionDuration: 120,
nested: {
level1: "value1",
level2: { deep: "nested" },
},
}),
},
);
if (!response.ok) {
const text = await response.text();
throw new Error(`${response.status}: ${text}`);
}
},
{ attempts: 5, delayMs: 500 },
);

// Wait for the transform to process and write to output table
await waitForDBWrite(
devProcess!,
"UserEventOutput",
1,
60_000,
"local",
`user_id = '${userId}'`,
);

// Verify the data was written correctly
const client = createClient(CLICKHOUSE_CONFIG);
const result = await client.query({
query: `
SELECT
user_id,
event_name,
org_id,
properties
FROM local.UserEventOutput
WHERE user_id = '${userId}'
`,
format: "JSONEachRow",
});

const rows: any[] = await result.json();
await client.close();

if (rows.length === 0) {
throw new Error(
`No data found in UserEventOutput for user_id ${userId}`,
);
}

const row = rows[0];

// Verify known fields are correctly passed through (snake_case for Python)
if (row.event_name !== "page_view") {
throw new Error(
`Expected event_name to be 'page_view', got '${row.event_name}'`,
);
}
if (row.org_id !== "org-123") {
throw new Error(
`Expected org_id to be 'org-123', got '${row.org_id}'`,
);
}

// Verify extra fields are stored in the properties JSON column
if (row.properties === undefined) {
throw new Error("Expected properties JSON column to exist");
}

// Parse properties if it's a string (ClickHouse may return JSON as string)
const properties =
typeof row.properties === "string" ?
JSON.parse(row.properties)
: row.properties;

// Verify extra fields were received by streaming function via model_extra
if (properties.customProperty !== "custom-value") {
throw new Error(
`Expected properties.customProperty to be 'custom-value', got '${properties.customProperty}'. Properties: ${JSON.stringify(properties)}`,
);
}
if (properties.pageUrl !== "/dashboard") {
throw new Error(
`Expected properties.pageUrl to be '/dashboard', got '${properties.pageUrl}'`,
);
}
// Note: ClickHouse JSON may return numbers as strings
if (Number(properties.sessionDuration) !== 120) {
throw new Error(
`Expected properties.sessionDuration to be 120, got '${properties.sessionDuration}'`,
);
}
if (
!properties.nested ||
properties.nested.level1 !== "value1" ||
!properties.nested.level2 ||
properties.nested.level2.deep !== "nested"
) {
throw new Error(
`Expected nested object to be preserved, got '${JSON.stringify(properties.nested)}'`,
);
}

console.log(
"✅ Extra fields test passed (Python) - extra fields received by streaming function via model_extra and stored in properties column",
);
});

// DateTime precision test for Python
it("should preserve microsecond precision with clickhouse_datetime64 annotations via streaming transform (PY)", async function () {
this.timeout(TIMEOUTS.TEST_SETUP_MS);
Expand Down
26 changes: 26 additions & 0 deletions apps/framework-cli-e2e/test/utils/schema-definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,19 @@ export const TYPESCRIPT_TEST_SCHEMAS: ExpectedTableSchema[] = [
{ name: "payloadBasic", type: "JSON(count Int64, name String)" },
],
},
// Index signature test table (ENG-1617)
// Extra fields from index signature are stored in properties JSON column
{
tableName: "UserEventOutput",
columns: [
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
{ name: "eventName", type: "String" },
{ name: "userId", type: "String" },
{ name: "orgId", type: "Nullable(String)" },
{ name: "projectId", type: "Nullable(String)" },
{ name: "properties", type: "JSON" },
],
},
// Primary Key Expression Tests
{
tableName: "PrimaryKeyExpressionTest",
Expand Down Expand Up @@ -923,6 +936,19 @@ export const PYTHON_TEST_SCHEMAS: ExpectedTableSchema[] = [
},
],
},
// Extra fields test table (ENG-1617)
// Extra fields from Pydantic's extra='allow' are stored in properties JSON column
{
tableName: "UserEventOutput",
columns: [
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
{ name: "event_name", type: "String" },
{ name: "user_id", type: "String" },
{ name: "org_id", type: "Nullable(String)" },
{ name: "project_id", type: "Nullable(String)" },
{ name: "properties", type: "JSON" },
],
},
];

// ============ HELPER FUNCTIONS ============
Expand Down
10 changes: 7 additions & 3 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1363,9 +1363,13 @@ async fn handle_json_array_body(
body.len(),
topic_name
);
let parsed = JsonDeserializer::from_slice(&body).deserialize_any(&mut DataModelArrayVisitor {
inner: DataModelVisitor::new(&data_model.columns, jwt_claims.as_ref()),
});
let visitor = if data_model.allow_extra_fields {
DataModelVisitor::new_with_extra_fields(&data_model.columns, jwt_claims.as_ref())
} else {
DataModelVisitor::new(&data_model.columns, jwt_claims.as_ref())
};
let parsed = JsonDeserializer::from_slice(&body)
.deserialize_any(&mut DataModelArrayVisitor { inner: visitor });

debug!("parsed json array for {}", topic_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ struct PartialIngestApi {
pub path: Option<String>,
#[serde(default)]
pub schema: serde_json::Map<String, serde_json::Value>,
/// Whether this API allows extra fields beyond the defined columns.
/// When true, extra fields in payloads are passed through to streaming functions.
#[serde(default)]
pub allow_extra_fields: bool,
}

/// Represents an egress API endpoint definition before conversion to a complete [`ApiEndpoint`].
Expand Down Expand Up @@ -1001,6 +1005,7 @@ impl PartialInfrastructureMap {
// If this is the app directory, we should use the project reference so that
// if we rename the app folder we don't have to fish for references
abs_file_path: main_file.to_path_buf(),
allow_extra_fields: partial_api.allow_extra_fields,
};

let api_endpoint = ApiEndpoint {
Expand Down
4 changes: 4 additions & 0 deletions apps/framework-cli/src/framework/data_model/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub struct DataModel {
pub config: DataModelConfig,
pub abs_file_path: PathBuf,
pub version: Version,
/// Whether this data model allows extra fields beyond the defined columns.
/// When true, extra fields in payloads are passed through to streaming functions.
#[serde(default)]
pub allow_extra_fields: bool,
}

impl DataModel {
Expand Down
1 change: 1 addition & 0 deletions apps/framework-cli/src/framework/python/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ fn python_class_to_framework_datamodel(
columns,
name: class_name,
config: Default::default(),
allow_extra_fields: false,
})
}

Expand Down
Loading
Loading