Skip to content

Commit d05b7e4

Browse files
committed
add python tests
1 parent c7f9527 commit d05b7e4

File tree

6 files changed

+299
-3
lines changed

6 files changed

+299
-3
lines changed

apps/framework-cli-e2e/test/templates.test.ts

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,145 @@ const createTemplateTestSuite = (config: TemplateTestConfig) => {
17701770
expect(apiData).to.be.an("array");
17711771
});
17721772

1773+
// Extra fields test for Python (ENG-1617)
1774+
// Tests that IngestApi accepts payloads with extra fields when the model has extra='allow'.
1775+
// Extra fields are passed through to streaming functions and stored in a JSON column.
1776+
//
1777+
// KEY CONCEPTS:
1778+
// - IngestApi/Stream with extra='allow': CAN accept variable fields
1779+
// - OlapTable: Requires fixed schema (ClickHouse needs to know columns)
1780+
// - Transform: Receives ALL fields via model_extra, outputs to fixed schema with JSON column
1781+
it("should pass extra fields to streaming function via Pydantic extra='allow' (PY)", async function () {
1782+
this.timeout(TIMEOUTS.TEST_SETUP_MS);
1783+
1784+
const userId = randomUUID();
1785+
const timestamp = new Date().toISOString();
1786+
1787+
// Send data with known fields plus arbitrary extra fields
1788+
await withRetries(
1789+
async () => {
1790+
const response = await fetch(
1791+
`${SERVER_CONFIG.url}/ingest/userEventIngestApi`,
1792+
{
1793+
method: "POST",
1794+
headers: { "Content-Type": "application/json" },
1795+
body: JSON.stringify({
1796+
// Known fields defined in the model (snake_case for Python)
1797+
timestamp: timestamp,
1798+
event_name: "page_view",
1799+
user_id: userId,
1800+
org_id: "org-123",
1801+
// Extra fields - allowed by extra='allow', passed to streaming function
1802+
customProperty: "custom-value",
1803+
pageUrl: "/dashboard",
1804+
sessionDuration: 120,
1805+
nested: {
1806+
level1: "value1",
1807+
level2: { deep: "nested" },
1808+
},
1809+
}),
1810+
},
1811+
);
1812+
if (!response.ok) {
1813+
const text = await response.text();
1814+
throw new Error(`${response.status}: ${text}`);
1815+
}
1816+
},
1817+
{ attempts: 5, delayMs: 500 },
1818+
);
1819+
1820+
// Wait for the transform to process and write to output table
1821+
await waitForDBWrite(
1822+
devProcess!,
1823+
"UserEventOutput",
1824+
1,
1825+
60_000,
1826+
"local",
1827+
`user_id = '${userId}'`,
1828+
);
1829+
1830+
// Verify the data was written correctly
1831+
const client = createClient(CLICKHOUSE_CONFIG);
1832+
const result = await client.query({
1833+
query: `
1834+
SELECT
1835+
user_id,
1836+
event_name,
1837+
org_id,
1838+
properties
1839+
FROM local.UserEventOutput
1840+
WHERE user_id = '${userId}'
1841+
`,
1842+
format: "JSONEachRow",
1843+
});
1844+
1845+
const rows: any[] = await result.json();
1846+
await client.close();
1847+
1848+
if (rows.length === 0) {
1849+
throw new Error(
1850+
`No data found in UserEventOutput for user_id ${userId}`,
1851+
);
1852+
}
1853+
1854+
const row = rows[0];
1855+
1856+
// Verify known fields are correctly passed through (snake_case for Python)
1857+
if (row.event_name !== "page_view") {
1858+
throw new Error(
1859+
`Expected event_name to be 'page_view', got '${row.event_name}'`,
1860+
);
1861+
}
1862+
if (row.org_id !== "org-123") {
1863+
throw new Error(
1864+
`Expected org_id to be 'org-123', got '${row.org_id}'`,
1865+
);
1866+
}
1867+
1868+
// Verify extra fields are stored in the properties JSON column
1869+
if (row.properties === undefined) {
1870+
throw new Error("Expected properties JSON column to exist");
1871+
}
1872+
1873+
// Parse properties if it's a string (ClickHouse may return JSON as string)
1874+
const properties =
1875+
typeof row.properties === "string" ?
1876+
JSON.parse(row.properties)
1877+
: row.properties;
1878+
1879+
// Verify extra fields were received by streaming function via model_extra
1880+
if (properties.customProperty !== "custom-value") {
1881+
throw new Error(
1882+
`Expected properties.customProperty to be 'custom-value', got '${properties.customProperty}'. Properties: ${JSON.stringify(properties)}`,
1883+
);
1884+
}
1885+
if (properties.pageUrl !== "/dashboard") {
1886+
throw new Error(
1887+
`Expected properties.pageUrl to be '/dashboard', got '${properties.pageUrl}'`,
1888+
);
1889+
}
1890+
// Note: ClickHouse JSON may return numbers as strings
1891+
if (Number(properties.sessionDuration) !== 120) {
1892+
throw new Error(
1893+
`Expected properties.sessionDuration to be 120, got '${properties.sessionDuration}'`,
1894+
);
1895+
}
1896+
if (
1897+
!properties.nested ||
1898+
properties.nested.level1 !== "value1" ||
1899+
!properties.nested.level2 ||
1900+
properties.nested.level2.deep !== "nested"
1901+
) {
1902+
throw new Error(
1903+
`Expected nested object to be preserved, got '${JSON.stringify(properties.nested)}'`,
1904+
);
1905+
}
1906+
1907+
console.log(
1908+
"✅ Extra fields test passed (Python) - extra fields received by streaming function via model_extra and stored in properties column",
1909+
);
1910+
});
1911+
17731912
// DateTime precision test for Python
17741913
it("should preserve microsecond precision with clickhouse_datetime64 annotations via streaming transform (PY)", async function () {
17751914
this.timeout(TIMEOUTS.TEST_SETUP_MS);

apps/framework-cli-e2e/test/utils/schema-definitions.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,19 @@ export const PYTHON_TEST_SCHEMAS: ExpectedTableSchema[] = [
936936
},
937937
],
938938
},
939+
// Extra fields test table (ENG-1617)
940+
// Extra fields from Pydantic's extra='allow' are stored in properties JSON column
941+
{
942+
tableName: "UserEventOutput",
943+
columns: [
944+
{ name: "timestamp", type: /DateTime\('UTC'\)/ },
945+
{ name: "event_name", type: "String" },
946+
{ name: "user_id", type: "String" },
947+
{ name: "org_id", type: "Nullable(String)" },
948+
{ name: "project_id", type: "Nullable(String)" },
949+
{ name: "properties", type: "JSON" },
950+
],
951+
},
939952
];
940953

941954
// ============ HELPER FUNCTIONS ============

packages/ts-moose-lib/src/dmv2/sdk/ingestApi.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ export class IngestApi<T> extends TypedBase<T, IngestConfig<T>> {
3939
*/
4040
constructor(name: string, config?: IngestConfig<T>);
4141

42-
/** @internal **/
42+
/**
43+
* @internal
44+
* Note: `validators` parameter is a positional placeholder (always undefined for IngestApi).
45+
* It exists because TypedBase has validators as the 5th param, and we need to pass
46+
* allowExtraFields as the 6th param. IngestApi doesn't use validators.
47+
*/
4348
constructor(
4449
name: string,
4550
config: IngestConfig<T>,

packages/ts-moose-lib/src/dmv2/sdk/stream.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,12 @@ export class Stream<T> extends TypedBase<T, StreamConfig<T>> {
228228
*/
229229
constructor(name: string, config?: StreamConfig<T>);
230230

231-
/** @internal **/
231+
/**
232+
* @internal
233+
* Note: `validators` parameter is a positional placeholder (always undefined for Stream).
234+
* It exists because TypedBase has validators as the 5th param, and we need to pass
235+
* allowExtraFields as the 6th param. Stream doesn't use validators.
236+
*/
232237
constructor(
233238
name: string,
234239
config: StreamConfig<T>,

templates/python-tests/src/ingest/models.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from moose_lib import Point, Ring, LineString, MultiLineString, Polygon, MultiPolygon
44
from moose_lib import Key, IngestPipeline, IngestPipelineConfig, StringToEnumMixin, clickhouse_default, ClickHouseCodec, OlapTable, \
55
OlapConfig, MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, simple_aggregated, \
6-
ClickhouseSize, UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64, ClickhousePrecision
6+
ClickhouseSize, UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64, ClickhousePrecision, \
7+
IngestApi, IngestConfigWithDestination
78
from datetime import datetime, date
89
from typing import Optional, Annotated, Any
910
from pydantic import BaseModel, BeforeValidator, ConfigDict
@@ -778,3 +779,78 @@ class MaterializedTest(BaseModel):
778779
dead_letter_queue=True
779780
)
780781
)
782+
783+
784+
# =======Extra Fields Test (ENG-1617)=========
785+
# Tests the ability to accept arbitrary payload fields using Pydantic's extra='allow'
786+
# This is the Python equivalent of TypeScript's index signatures
787+
#
788+
# KEY CONCEPTS:
789+
# - IngestApi and Stream: CAN allow extra fields (accept variable payloads)
790+
# - OlapTable: Requires fixed schema (ClickHouse needs to know the columns)
791+
# - Transform functions: Receive ALL fields (including extras)
792+
# and must output to a fixed schema for OlapTable storage
793+
#
794+
# DATA FLOW:
795+
# IngestApi (variable) → Stream (variable) → Transform → Stream (fixed) → OlapTable (fixed)
796+
797+
class UserEventInput(BaseModel):
798+
"""Input type that allows arbitrary additional fields.
799+
800+
Known fields (timestamp, event_name, user_id) are validated.
801+
Unknown fields are passed through to streaming functions via extra='allow'.
802+
"""
803+
model_config = ConfigDict(extra='allow') # Allow arbitrary extra fields
804+
805+
timestamp: datetime
806+
event_name: str
807+
user_id: Key[str]
808+
# Optional known fields
809+
org_id: Optional[str] = None
810+
project_id: Optional[str] = None
811+
812+
813+
class UserEventOutput(BaseModel):
814+
"""Output type with a FIXED schema for OlapTable storage.
815+
816+
Extra fields from the input are stored in a JSON column.
817+
OlapTable requires fixed columns - ClickHouse needs to know the schema.
818+
"""
819+
timestamp: datetime
820+
event_name: str
821+
user_id: Key[str]
822+
org_id: Optional[str] = None
823+
project_id: Optional[str] = None
824+
# JSON column for extra properties
825+
# This is how you persist variable fields to ClickHouse
826+
properties: dict[str, Any]
827+
828+
829+
# Input stream for raw events (accepts variable fields via extra='allow')
830+
user_event_input_stream = Stream[UserEventInput](
831+
"UserEventInput",
832+
StreamConfig(destination=None)
833+
)
834+
835+
# IngestApi accepting arbitrary payload fields
836+
user_event_ingest_api = IngestApi[UserEventInput](
837+
"userEventIngestApi",
838+
IngestConfigWithDestination(
839+
destination=user_event_input_stream,
840+
version="0.1"
841+
)
842+
)
843+
844+
# Output table with fixed schema (JSON column stores variable data)
845+
user_event_output_table = OlapTable[UserEventOutput](
846+
"UserEventOutput",
847+
OlapConfig(
848+
order_by_fields=["user_id", "timestamp"]
849+
)
850+
)
851+
852+
# Stream for processed events (fixed schema)
853+
user_event_output_stream = Stream[UserEventOutput](
854+
"UserEventOutput",
855+
StreamConfig(destination=user_event_output_table)
856+
)

templates/python-tests/src/ingest/transforms.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,61 @@ def datetime_precision_transform(input_data: DateTimePrecisionTestData) -> DateT
146146
destination=datetime_precision_output_stream,
147147
transformation=datetime_precision_transform,
148148
)
149+
150+
151+
# ============================================================================
152+
# Extra Fields Transform (ENG-1617)
153+
# Demonstrates how streaming functions receive extra fields from Pydantic's extra='allow'
154+
# ============================================================================
155+
156+
from src.ingest.models import (
157+
user_event_input_stream,
158+
user_event_output_stream,
159+
UserEventInput,
160+
UserEventOutput,
161+
)
162+
163+
164+
def user_event_transform(input_data: UserEventInput) -> UserEventOutput:
165+
"""Transform for extra fields types.
166+
167+
KEY POINT: The streaming function receives ALL fields from the input,
168+
including extra fields allowed by Pydantic's `model_config = ConfigDict(extra='allow')`.
169+
170+
Since OlapTable requires a fixed schema, we extract known fields and store
171+
extra fields in a JSON column (`properties`).
172+
"""
173+
# Get known fields
174+
known_fields = {
175+
"timestamp": input_data.timestamp,
176+
"event_name": input_data.event_name,
177+
"user_id": input_data.user_id,
178+
"org_id": input_data.org_id,
179+
"project_id": input_data.project_id,
180+
}
181+
182+
# Get extra fields using model_extra (Pydantic v2)
183+
# This contains all fields that were not defined in the model schema
184+
extra_fields = input_data.model_extra or {}
185+
186+
# Log to demonstrate that extra fields ARE received by the streaming function
187+
print("Extra fields transform received:", {
188+
"known_fields": known_fields,
189+
"extra_fields": extra_fields, # These came through extra='allow'!
190+
})
191+
192+
return UserEventOutput(
193+
timestamp=input_data.timestamp,
194+
event_name=input_data.event_name,
195+
user_id=input_data.user_id,
196+
org_id=input_data.org_id,
197+
project_id=input_data.project_id,
198+
# Store extra fields in JSON column for persistence to ClickHouse
199+
properties=extra_fields,
200+
)
201+
202+
203+
user_event_input_stream.add_transform(
204+
destination=user_event_output_stream,
205+
transformation=user_event_transform,
206+
)

0 commit comments

Comments
 (0)