Skip to content

Commit 11a7984

Browse files
authored
feat: enable arbitrary json in the ingest apis (#3047)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Adds end-to-end support for accepting arbitrary JSON fields in IngestApi/Stream (TS index signatures, Python extra='allow') and passing them through to streaming functions, with validation, infra wiring, tests, and docs. > > - **Ingestion pipeline: arbitrary fields support** > - TS: Allow index signatures for `IngestApi`, `Stream`, and `IngestPipeline` (when `table=false`); inject `allowExtraFields` through `TypedBase`, `Stream`, `IngestApi`, and `IngestPipeline`. > - PY: Treat `dict[str, Any]` and `Any` as `JSON`; detect Pydantic `extra='allow'` and emit `allow_extra_fields` in infra map. > - Infra/Rust: Add `allow_extra_fields` to `DataModel` and ingest API wiring; enhance `DataModelVisitor` with pass-through for extra fields using streaming `PassThroughSeed`; route to Kafka unchanged. > - Infra map: Carry `allow_extra_fields` from partial -> full map for ingress APIs. > - **Compiler/validation** > - TS compiler plugin: `toColumns` option to allow index signatures; block `IngestPipeline` with table+index signatures; propagate flag into constructors. > - **Tests & e2e** > - Add TS unit tests for index signature validation and extensive e2e tests (TS/PY) verifying extra fields pass to transforms and persist in `UserEventOutput.properties`. > - Update expected schemas to include `local.UserEventOutput` (TS/PY variants). > - **Docs** > - Document accepting arbitrary fields for TS (index signatures) and PY (`extra='allow'`), with transform examples. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 40393f0. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 3e4bc36 commit 11a7984

File tree

24 files changed

+1355
-23
lines changed

24 files changed

+1355
-23
lines changed

apps/framework-cli-e2e/test/templates.test.ts

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,146 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
12661266
}
12671267
});
12681268

1269+
// Index signature test for TypeScript (ENG-1617)
1270+
// Tests that IngestApi accepts payloads with extra fields when the type has an index signature.
1271+
// Extra fields are passed through to streaming functions and stored in a JSON column.
1272+
//
1273+
// KEY CONCEPTS:
1274+
// - IngestApi/Stream: CAN have index signatures (accept variable fields)
1275+
// - OlapTable: CANNOT have index signatures (ClickHouse requires fixed schema)
1276+
// - Transform: Receives ALL fields, outputs to fixed schema with JSON column for extras
1277+
1278+
it("should pass extra fields to streaming function via index signature", async function () {
1279+
this.timeout(TIMEOUTS.TEST_SETUP_MS);
1280+
1281+
const userId = randomUUID();
1282+
const timestamp = new Date().toISOString();
1283+
1284+
// Send data with known fields plus arbitrary extra fields
1285+
await withRetries(
1286+
async () => {
1287+
const response = await fetch(
1288+
`${SERVER_CONFIG.url}/ingest/userEventIngestApi`,
1289+
{
1290+
method: "POST",
1291+
headers: { "Content-Type": "application/json" },
1292+
body: JSON.stringify({
1293+
// Known fields defined in the type
1294+
timestamp: timestamp,
1295+
eventName: "page_view",
1296+
userId: userId,
1297+
orgId: "org-123",
1298+
// Extra fields - allowed by index signature, passed to streaming function
1299+
customProperty: "custom-value",
1300+
pageUrl: "/dashboard",
1301+
sessionDuration: 120,
1302+
nested: {
1303+
level1: "value1",
1304+
level2: { deep: "nested" },
1305+
},
1306+
}),
1307+
},
1308+
);
1309+
if (!response.ok) {
1310+
const text = await response.text();
1311+
throw new Error(`${response.status}: ${text}`);
1312+
}
1313+
},
1314+
{ attempts: 5, delayMs: 500 },
1315+
);
1316+
1317+
// Wait for the transform to process and write to output table
1318+
await waitForDBWrite(
1319+
devProcess!,
1320+
"UserEventOutput",
1321+
1,
1322+
60_000,
1323+
"local",
1324+
`userId = '${userId}'`,
1325+
);
1326+
1327+
// Verify the data was written correctly
1328+
const client = createClient(CLICKHOUSE_CONFIG);
1329+
const result = await client.query({
1330+
query: `
1331+
SELECT
1332+
userId,
1333+
eventName,
1334+
orgId,
1335+
properties
1336+
FROM local.UserEventOutput
1337+
WHERE userId = '${userId}'
1338+
`,
1339+
format: "JSONEachRow",
1340+
});
1341+
1342+
const rows: any[] = await result.json();
1343+
await client.close();
1344+
1345+
if (rows.length === 0) {
1346+
throw new Error(
1347+
`No data found in UserEventOutput for userId ${userId}`,
1348+
);
1349+
}
1350+
1351+
const row = rows[0];
1352+
1353+
// Verify known fields are correctly passed through
1354+
if (row.eventName !== "page_view") {
1355+
throw new Error(
1356+
`Expected eventName to be 'page_view', got '${row.eventName}'`,
1357+
);
1358+
}
1359+
if (row.orgId !== "org-123") {
1360+
throw new Error(
1361+
`Expected orgId to be 'org-123', got '${row.orgId}'`,
1362+
);
1363+
}
1364+
1365+
// Verify extra fields are stored in the properties JSON column
1366+
if (row.properties === undefined) {
1367+
throw new Error("Expected properties JSON column to exist");
1368+
}
1369+
1370+
// Parse properties if it's a string (ClickHouse may return JSON as string)
1371+
const properties =
1372+
typeof row.properties === "string" ?
1373+
JSON.parse(row.properties)
1374+
: row.properties;
1375+
1376+
// Verify extra fields were received by streaming function and stored in properties
1377+
if (properties.customProperty !== "custom-value") {
1378+
throw new Error(
1379+
`Expected properties.customProperty to be 'custom-value', got '${properties.customProperty}'. Properties: ${JSON.stringify(properties)}`,
1380+
);
1381+
}
1382+
if (properties.pageUrl !== "/dashboard") {
1383+
throw new Error(
1384+
`Expected properties.pageUrl to be '/dashboard', got '${properties.pageUrl}'`,
1385+
);
1386+
}
1387+
// Note: ClickHouse JSON may return numbers as strings
1388+
if (Number(properties.sessionDuration) !== 120) {
1389+
throw new Error(
1390+
`Expected properties.sessionDuration to be 120, got '${properties.sessionDuration}'`,
1391+
);
1392+
}
1393+
if (
1394+
!properties.nested ||
1395+
properties.nested.level1 !== "value1" ||
1396+
!properties.nested.level2 ||
1397+
properties.nested.level2.deep !== "nested"
1398+
) {
1399+
throw new Error(
1400+
`Expected nested object to be preserved, got '${JSON.stringify(properties.nested)}'`,
1401+
);
1402+
}
1403+
1404+
console.log(
1405+
"✅ Index signature test passed - extra fields received by streaming function and stored in properties column",
1406+
);
1407+
});
1408+
12691409
// DateTime precision test for TypeScript
12701410
it("should preserve microsecond precision with DateTime64String types via streaming transform", async function () {
12711411
this.timeout(TIMEOUTS.TEST_SETUP_MS);
@@ -1630,6 +1770,145 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
16301770
expect(apiData).to.be.an("array");
16311771
});
16321772

1773+
// Extra fields test for Python (ENG-1617)
1774+
// Tests that IngestApi accepts payloads with extra fields when the model has extra='allow'.
1775+
// Extra fields are passed through to streaming functions and stored in a JSON column.
1776+
//
1777+
// KEY CONCEPTS:
1778+
// - IngestApi/Stream with extra='allow': CAN accept variable fields
1779+
// - OlapTable: Requires fixed schema (ClickHouse needs to know columns)
1780+
// - Transform: Receives ALL fields via model_extra, outputs to fixed schema with JSON column
1781+
it("should pass extra fields to streaming function via Pydantic extra='allow' (PY)", async function () {
1782+
this.timeout(TIMEOUTS.TEST_SETUP_MS);
1783+
1784+
const userId = randomUUID();
1785+
const timestamp = new Date().toISOString();
1786+
1787+
// Send data with known fields plus arbitrary extra fields
1788+
await withRetries(
1789+
async () => {
1790+
const response = await fetch(
1791+
`${SERVER_CONFIG.url}/ingest/userEventIngestApi`,
1792+
{
1793+
method: "POST",
1794+
headers: { "Content-Type": "application/json" },
1795+
body: JSON.stringify({
1796+
// Known fields defined in the model (snake_case for Python)
1797+
timestamp: timestamp,
1798+
event_name: "page_view",
1799+
user_id: userId,
1800+
org_id: "org-123",
1801+
// Extra fields - allowed by extra='allow', passed to streaming function
1802+
customProperty: "custom-value",
1803+
pageUrl: "/dashboard",
1804+
sessionDuration: 120,
1805+
nested: {
1806+
level1: "value1",
1807+
level2: { deep: "nested" },
1808+
},
1809+
}),
1810+
},
1811+
);
1812+
if (!response.ok) {
1813+
const text = await response.text();
1814+
throw new Error(`${response.status}: ${text}`);
1815+
}
1816+
},
1817+
{ attempts: 5, delayMs: 500 },
1818+
);
1819+
1820+
// Wait for the transform to process and write to output table
1821+
await waitForDBWrite(
1822+
devProcess!,
1823+
"UserEventOutput",
1824+
1,
1825+
60_000,
1826+
"local",
1827+
`user_id = '${userId}'`,
1828+
);
1829+
1830+
// Verify the data was written correctly
1831+
const client = createClient(CLICKHOUSE_CONFIG);
1832+
const result = await client.query({
1833+
query: `
1834+
SELECT
1835+
user_id,
1836+
event_name,
1837+
org_id,
1838+
properties
1839+
FROM local.UserEventOutput
1840+
WHERE user_id = '${userId}'
1841+
`,
1842+
format: "JSONEachRow",
1843+
});
1844+
1845+
const rows: any[] = await result.json();
1846+
await client.close();
1847+
1848+
if (rows.length === 0) {
1849+
throw new Error(
1850+
`No data found in UserEventOutput for user_id ${userId}`,
1851+
);
1852+
}
1853+
1854+
const row = rows[0];
1855+
1856+
// Verify known fields are correctly passed through (snake_case for Python)
1857+
if (row.event_name !== "page_view") {
1858+
throw new Error(
1859+
`Expected event_name to be 'page_view', got '${row.event_name}'`,
1860+
);
1861+
}
1862+
if (row.org_id !== "org-123") {
1863+
throw new Error(
1864+
`Expected org_id to be 'org-123', got '${row.org_id}'`,
1865+
);
1866+
}
1867+
1868+
// Verify extra fields are stored in the properties JSON column
1869+
if (row.properties === undefined) {
1870+
throw new Error("Expected properties JSON column to exist");
1871+
}
1872+
1873+
// Parse properties if it's a string (ClickHouse may return JSON as string)
1874+
const properties =
1875+
typeof row.properties === "string" ?
1876+
JSON.parse(row.properties)
1877+
: row.properties;
1878+
1879+
// Verify extra fields were received by streaming function via model_extra
1880+
if (properties.customProperty !== "custom-value") {
1881+
throw new Error(
1882+
`Expected properties.customProperty to be 'custom-value', got '${properties.customProperty}'. Properties: ${JSON.stringify(properties)}`,
1883+
);
1884+
}
1885+
if (properties.pageUrl !== "/dashboard") {
1886+
throw new Error(
1887+
`Expected properties.pageUrl to be '/dashboard', got '${properties.pageUrl}'`,
1888+
);
1889+
}
1890+
// Note: ClickHouse JSON may return numbers as strings
1891+
if (Number(properties.sessionDuration) !== 120) {
1892+
throw new Error(
1893+
`Expected properties.sessionDuration to be 120, got '${properties.sessionDuration}'`,
1894+
);
1895+
}
1896+
if (
1897+
!properties.nested ||
1898+
properties.nested.level1 !== "value1" ||
1899+
!properties.nested.level2 ||
1900+
properties.nested.level2.deep !== "nested"
1901+
) {
1902+
throw new Error(
1903+
`Expected nested object to be preserved, got '${JSON.stringify(properties.nested)}'`,
1904+
);
1905+
}
1906+
1907+
console.log(
1908+
"✅ Extra fields test passed (Python) - extra fields received by streaming function via model_extra and stored in properties column",
1909+
);
1910+
});
1911+
16331912
// DateTime precision test for Python
16341913
it("should preserve microsecond precision with clickhouse_datetime64 annotations via streaming transform (PY)", async function () {
16351914
this.timeout(TIMEOUTS.TEST_SETUP_MS);

apps/framework-cli-e2e/test/utils/schema-definitions.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,19 @@ export const TYPESCRIPT_TEST_SCHEMAS: ExpectedTableSchema[] = [
421421
{ name: "payloadBasic", type: "JSON(count Int64, name String)" },
422422
],
423423
},
424+
// Index signature test table (ENG-1617)
425+
// Extra fields from index signature are stored in properties JSON column
426+
{
427+
tableName: "UserEventOutput",
428+
columns: [
429+
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
430+
{ name: "eventName", type: "String" },
431+
{ name: "userId", type: "String" },
432+
{ name: "orgId", type: "Nullable(String)" },
433+
{ name: "projectId", type: "Nullable(String)" },
434+
{ name: "properties", type: "JSON" },
435+
],
436+
},
424437
// Primary Key Expression Tests
425438
{
426439
tableName: "PrimaryKeyExpressionTest",
@@ -923,6 +936,19 @@ export const PYTHON_TEST_SCHEMAS: ExpectedTableSchema[] = [
923936
},
924937
],
925938
},
939+
// Extra fields test table (ENG-1617)
940+
// Extra fields from Pydantic's extra='allow' are stored in properties JSON column
941+
{
942+
tableName: "UserEventOutput",
943+
columns: [
944+
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
945+
{ name: "event_name", type: "String" },
946+
{ name: "user_id", type: "String" },
947+
{ name: "org_id", type: "Nullable(String)" },
948+
{ name: "project_id", type: "Nullable(String)" },
949+
{ name: "properties", type: "JSON" },
950+
],
951+
},
926952
];
927953

928954
// ============ HELPER FUNCTIONS ============

apps/framework-cli/src/cli/local_webserver.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,9 +1363,13 @@ async fn handle_json_array_body(
13631363
body.len(),
13641364
topic_name
13651365
);
1366-
let parsed = JsonDeserializer::from_slice(&body).deserialize_any(&mut DataModelArrayVisitor {
1367-
inner: DataModelVisitor::new(&data_model.columns, jwt_claims.as_ref()),
1368-
});
1366+
let visitor = if data_model.allow_extra_fields {
1367+
DataModelVisitor::new_with_extra_fields(&data_model.columns, jwt_claims.as_ref())
1368+
} else {
1369+
DataModelVisitor::new(&data_model.columns, jwt_claims.as_ref())
1370+
};
1371+
let parsed = JsonDeserializer::from_slice(&body)
1372+
.deserialize_any(&mut DataModelArrayVisitor { inner: visitor });
13691373

13701374
debug!("parsed json array for {}", topic_name);
13711375

apps/framework-cli/src/framework/core/partial_infrastructure_map.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ struct PartialIngestApi {
343343
pub path: Option<String>,
344344
#[serde(default)]
345345
pub schema: serde_json::Map<String, serde_json::Value>,
346+
/// Whether this API allows extra fields beyond the defined columns.
347+
/// When true, extra fields in payloads are passed through to streaming functions.
348+
#[serde(default)]
349+
pub allow_extra_fields: bool,
346350
}
347351

348352
/// Represents an egress API endpoint definition before conversion to a complete [`ApiEndpoint`].
@@ -1001,6 +1005,7 @@ impl PartialInfrastructureMap {
10011005
// If this is the app directory, we should use the project reference so that
10021006
// if we rename the app folder we don't have to fish for references
10031007
abs_file_path: main_file.to_path_buf(),
1008+
allow_extra_fields: partial_api.allow_extra_fields,
10041009
};
10051010

10061011
let api_endpoint = ApiEndpoint {

apps/framework-cli/src/framework/data_model/model.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ pub struct DataModel {
2121
pub config: DataModelConfig,
2222
pub abs_file_path: PathBuf,
2323
pub version: Version,
24+
/// Whether this data model allows extra fields beyond the defined columns.
25+
/// When true, extra fields in payloads are passed through to streaming functions.
26+
#[serde(default)]
27+
pub allow_extra_fields: bool,
2428
}
2529

2630
impl DataModel {

apps/framework-cli/src/framework/python/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ fn python_class_to_framework_datamodel(
258258
columns,
259259
name: class_name,
260260
config: Default::default(),
261+
allow_extra_fields: false,
261262
})
262263
}
263264

0 commit comments

Comments
 (0)