@@ -5,7 +5,7 @@ use crate::infrastructure::processes::kafka_clickhouse_sync::IPV4_PATTERN;
55use itertools:: Either ;
66use num_bigint:: { BigInt , BigUint } ;
77use regex:: Regex ;
8- use serde:: de:: { DeserializeSeed , Error , MapAccess , SeqAccess , Visitor } ;
8+ use serde:: de:: { Deserialize , DeserializeSeed , Error , MapAccess , SeqAccess , Visitor } ;
99use serde:: ser:: { SerializeMap , SerializeSeq } ;
1010use serde:: { forward_to_deserialize_any, Deserializer , Serialize , Serializer } ;
1111use serde_json:: Value ;
@@ -616,11 +616,10 @@ impl<'de, S: SerializeValue> Visitor<'de> for &mut ValueVisitor<'_, S> {
616616 {
617617 match self . t {
618618 ColumnType :: Nested ( ref fields) => {
619- let inner = DataModelVisitor :: with_context (
619+ let inner = DataModelVisitor :: with_nested_context (
620620 & fields. columns ,
621- Some ( & self . context ) ,
621+ & self . context ,
622622 self . jwt_claims ,
623- false , // Nested types don't allow extra fields
624623 ) ;
625624 let serializer = MapAccessSerializer {
626625 inner : RefCell :: new ( inner) ,
@@ -654,12 +653,8 @@ impl<'de, S: SerializeValue> Visitor<'de> for &mut ValueVisitor<'_, S> {
654653 }
655654 } )
656655 . collect ( ) ;
657- let inner = DataModelVisitor :: with_context (
658- & columns,
659- Some ( & self . context ) ,
660- self . jwt_claims ,
661- false , // NamedTuples don't allow extra fields
662- ) ;
656+ let inner =
657+ DataModelVisitor :: with_nested_context ( & columns, & self . context , self . jwt_claims ) ;
663658 let serializer = MapAccessSerializer {
664659 inner : RefCell :: new ( inner) ,
665660 map : RefCell :: new ( map) ,
@@ -1008,6 +1003,120 @@ impl<'de, A: MapAccess<'de>> Serialize for MapAccessSerializer<'de, '_, A> {
10081003 }
10091004}
10101005
1006+ /// Helper seed for streaming pass-through of arbitrary values without intermediate allocation.
1007+ /// This is used to efficiently pass through extra fields in ingest payloads.
1008+ /// For complex nested structures, falls back to serde_json::Value, but handles primitives
1009+ /// and simple types without allocation.
1010+ struct PassThroughSeed < ' a , S : SerializeMap > ( & ' a mut S ) ;
1011+
1012+ impl < ' de , S : SerializeMap > DeserializeSeed < ' de > for PassThroughSeed < ' _ , S > {
1013+ type Value = ( ) ;
1014+
1015+ fn deserialize < D > ( self , deserializer : D ) -> Result < Self :: Value , D :: Error >
1016+ where
1017+ D : Deserializer < ' de > ,
1018+ {
1019+ // Use a visitor to deserialize and immediately serialize primitives
1020+ struct PassThroughVisitor < ' a , S : SerializeMap > ( & ' a mut S ) ;
1021+
1022+ impl < ' de , S : SerializeMap > Visitor < ' de > for PassThroughVisitor < ' _ , S > {
1023+ type Value = ( ) ;
1024+
1025+ fn expecting ( & self , formatter : & mut Formatter ) -> std:: fmt:: Result {
1026+ formatter. write_str ( "any valid JSON value" )
1027+ }
1028+
1029+ fn visit_bool < E > ( self , v : bool ) -> Result < Self :: Value , E >
1030+ where
1031+ E : serde:: de:: Error ,
1032+ {
1033+ self . 0 . serialize_value ( & v) . map_err ( E :: custom)
1034+ }
1035+
1036+ fn visit_i64 < E > ( self , v : i64 ) -> Result < Self :: Value , E >
1037+ where
1038+ E : serde:: de:: Error ,
1039+ {
1040+ self . 0 . serialize_value ( & v) . map_err ( E :: custom)
1041+ }
1042+
1043+ fn visit_u64 < E > ( self , v : u64 ) -> Result < Self :: Value , E >
1044+ where
1045+ E : serde:: de:: Error ,
1046+ {
1047+ self . 0 . serialize_value ( & v) . map_err ( E :: custom)
1048+ }
1049+
1050+ fn visit_f64 < E > ( self , v : f64 ) -> Result < Self :: Value , E >
1051+ where
1052+ E : serde:: de:: Error ,
1053+ {
1054+ self . 0 . serialize_value ( & v) . map_err ( E :: custom)
1055+ }
1056+
1057+ fn visit_str < E > ( self , v : & str ) -> Result < Self :: Value , E >
1058+ where
1059+ E : serde:: de:: Error ,
1060+ {
1061+ self . 0 . serialize_value ( v) . map_err ( E :: custom)
1062+ }
1063+
1064+ fn visit_string < E > ( self , v : String ) -> Result < Self :: Value , E >
1065+ where
1066+ E : serde:: de:: Error ,
1067+ {
1068+ self . 0 . serialize_value ( & v) . map_err ( E :: custom)
1069+ }
1070+
1071+ fn visit_none < E > ( self ) -> Result < Self :: Value , E >
1072+ where
1073+ E : serde:: de:: Error ,
1074+ {
1075+ self . 0
1076+ . serialize_value ( & Option :: < ( ) > :: None )
1077+ . map_err ( E :: custom)
1078+ }
1079+
1080+ fn visit_some < D > ( self , deserializer : D ) -> Result < Self :: Value , D :: Error >
1081+ where
1082+ D : Deserializer < ' de > ,
1083+ {
1084+ // For Option<T>, recursively pass through the inner value
1085+ PassThroughSeed ( self . 0 ) . deserialize ( deserializer)
1086+ }
1087+
1088+ fn visit_unit < E > ( self ) -> Result < Self :: Value , E >
1089+ where
1090+ E : serde:: de:: Error ,
1091+ {
1092+ self . 0 . serialize_value ( & ( ) ) . map_err ( E :: custom)
1093+ }
1094+
1095+ fn visit_seq < A > ( self , seq : A ) -> Result < Self :: Value , A :: Error >
1096+ where
1097+ A : SeqAccess < ' de > ,
1098+ {
1099+ // For complex nested structures (arrays/objects), fall back to Value
1100+ // to avoid complexity. In practice, extra fields are usually primitives.
1101+ let value = Value :: deserialize ( serde:: de:: value:: SeqAccessDeserializer :: new ( seq) ) ?;
1102+ self . 0 . serialize_value ( & value) . map_err ( A :: Error :: custom)
1103+ }
1104+
1105+ fn visit_map < A > ( self , map : A ) -> Result < Self :: Value , A :: Error >
1106+ where
1107+ A : MapAccess < ' de > ,
1108+ {
1109+ // For complex nested structures (arrays/objects), fall back to Value
1110+ // to avoid complexity. In practice, extra fields are usually primitives.
1111+ let value = Value :: deserialize ( serde:: de:: value:: MapAccessDeserializer :: new ( map) ) ?;
1112+ self . 0 . serialize_value ( & value) . map_err ( A :: Error :: custom)
1113+ }
1114+ }
1115+
1116+ deserializer. deserialize_any ( PassThroughVisitor ( self . 0 ) )
1117+ }
1118+ }
1119+
10111120pub struct DataModelVisitor < ' a > {
10121121 columns : HashMap < String , ( Column , State ) > ,
10131122 parent_context : Option < & ' a ParentContext < ' a > > ,
@@ -1043,6 +1152,16 @@ impl<'a> DataModelVisitor<'a> {
10431152 }
10441153 }
10451154
1155+ /// Create a visitor for nested structures (Nested types and NamedTuples).
1156+ /// These types don't allow extra fields, so `allow_extra_fields` is always false.
1157+ fn with_nested_context (
1158+ columns : & [ Column ] ,
1159+ parent_context : & ' a ParentContext < ' a > ,
1160+ jwt_claims : Option < & ' a Value > ,
1161+ ) -> Self {
1162+ Self :: with_context ( columns, Some ( parent_context) , jwt_claims, false )
1163+ }
1164+
10461165 fn transfer_map_access_to_serialize_map < ' de , A : MapAccess < ' de > , S : SerializeMap > (
10471166 & mut self ,
10481167 map : & mut A ,
@@ -1069,13 +1188,11 @@ impl<'a> DataModelVisitor<'a> {
10691188 map. next_value_seed ( & mut visitor) ?;
10701189 } else if self . allow_extra_fields {
10711190 // Pass through extra fields when allowed (e.g., for types with index signatures)
1072- let value : Value = map . next_value ( ) ? ;
1191+ // Use streaming pass-through to avoid intermediate Value allocation
10731192 map_serializer
10741193 . serialize_key ( & key)
10751194 . map_err ( A :: Error :: custom) ?;
1076- map_serializer
1077- . serialize_value ( & value)
1078- . map_err ( A :: Error :: custom) ?;
1195+ map. next_value_seed ( PassThroughSeed ( map_serializer) ) ?;
10791196 } else {
10801197 map. next_value :: < serde:: de:: IgnoredAny > ( ) ?;
10811198 }
0 commit comments