1- from typing import List , Tuple , Iterator
1+ from typing import List , Tuple , Iterator , Iterable , Any , Optional , Union
2+ from functools import wraps
23
34import pandas as pd
45
89from exasol_udf_mock_python .udf_context import UDFContext
910
1011
12+ def check_context (f ):
13+ """
14+ Decorator checking that a MockContext object has valid current group context.
15+ Raises a RuntimeError if this is not the case.
16+ """
17+ @wraps (f )
18+ def wrapper (self , * args , ** kwargs ):
19+ if self .no_context :
20+ raise RuntimeError ('Calling UDFContext interface when the current group context '
21+ 'is invalid is disallowed' )
22+ return f (self , * args , ** kwargs )
23+
24+ return wrapper
25+
26+
27+ def validate_emit (row : Tuple , columns : List [Column ]):
28+ """
29+ Validates that a data row to be emitted corresponds to the definition of the output columns.
30+ The number of elements in the row should match the number of columns and the type of each
31+ element should match the type of the correspondent column. Raises a ValueError if the first
32+ condition is false or a TypeError if the second condition is false.
33+
34+ :param row: Data row
35+ :param columns: Column definition.
36+ """
37+ if len (row ) != len (columns ):
38+ raise ValueError (f"row { row } has not the same number of values as columns are defined" )
39+ for i , column in enumerate (columns ):
40+ if row [i ] is not None and not isinstance (row [i ], column .type ):
41+ raise TypeError (f"Value { row [i ]} ({ type (row [i ])} ) at position { i } is not a { column .type } " )
42+
43+
1144class MockContext (UDFContext ):
45+ """
46+ Implementation of generic UDF Mock Context interface for a SET UDF with groups.
47+ This class allows iterating over groups. The functionality of the UDF Context are applicable
48+ for the current input group.
49+
50+ Call `next_group` to iterate over groups. The `output_groups` property provides the emit
51+ output for all groups iterated so far including the output for the current group.
52+
53+ Calling any function of the UDFContext interface when the group iterator has passed the end
54+ or before the first call to the `next_group` is illegal and will cause a RuntimeException.
55+ """
1256
1357 def __init__ (self , input_groups : Iterator [Group ], metadata : MockMetaData ):
58+ """
59+ :param input_groups: Input groups. Each group object should contain input rows for the group.
60+
61+ :param metadata: The mock metadata object.
62+ """
63+
1464 self ._input_groups = input_groups
15- self ._output_groups = []
16- self ._input_group = None # type: Group
17- self ._output_group_list = None # type: List
18- self ._output_group = None # type: Group
19- self ._iter = None # type: Iterator[Tuple]
20- self ._len = None # type: int
2165 self ._metadata = metadata
22- self ._name_position_map = \
23- {column .name : position
24- for position , column
25- in enumerate (metadata .input_columns )}
66+ """ Mock context for the current group """
67+ self ._current_context : Optional [StandaloneMockContext ] = None
68+ """ Output for all groups """
69+ self ._previous_output : List [Group ] = []
70+
71+ @property
72+ def no_context (self ) -> bool :
73+ """Returns True if the current group context is invalid"""
74+ return self ._current_context is None
2675
27- def _next_group (self ):
76+ def next_group (self ) -> bool :
77+ """
78+ Moves group iterator to the next group.
79+ Returns False if the iterator gets beyond the last group. Returns True otherwise.
80+ """
81+
82+ # Save output of the current group
83+ if self ._current_context is not None :
84+ self ._previous_output .append (Group (self ._current_context .output ))
85+ self ._current_context = None
86+
87+ # Try get to the next input group
2888 try :
29- self . _input_group = next (self ._input_groups )
89+ input_group = next (self ._input_groups )
3090 except StopIteration as e :
31- self ._data = None
32- self ._output_group_list = None
33- self ._output_group = None
34- self ._input_group = None
35- self ._iter = None
36- self ._len = None
3791 return False
38- self ._len = len (self ._input_group )
39- if self ._len == 0 :
40- self ._data = None
41- self ._output_group_list = None
42- self ._output_group = None
43- self ._input_group = None
44- self ._iter = None
45- self ._len = None
46- raise RuntimeError ("Empty input groups are not allowd" )
47- self ._output_group_list = []
48- self ._output_group = Group (self ._output_group_list )
49- self ._output_groups .append (self ._output_group )
50- self ._iter = iter (self ._input_group )
51- self .next ()
92+ if len (input_group ) == 0 :
93+ raise RuntimeError ("Empty input groups are not allowed" )
94+
95+ # Create Mock Context for the new input group
96+ self ._current_context = StandaloneMockContext (input_group , self ._metadata )
5297 return True
5398
54- def _is_positive_integer (self , value ):
99+ @property
100+ def output_groups (self ):
101+ """
102+ Output of all groups including the current one.
103+ """
104+ if self ._current_context is None :
105+ return self ._previous_output
106+ else :
107+ groups = list (self ._previous_output )
108+ groups .append (Group (self ._current_context .output ))
109+ return groups
110+
111+ @check_context
112+ def __getattr__ (self , name ):
113+ return getattr (self ._current_context , name )
114+
115+ @check_context
116+ def get_dataframe (self , num_rows : Union [str , int ], start_col : int = 0 ) -> Optional [pd .DataFrame ]:
117+ return self ._current_context .get_dataframe (num_rows , start_col )
118+
119+ @check_context
120+ def next (self , reset : bool = False ) -> bool :
121+ return self ._current_context .next (reset )
122+
123+ @check_context
124+ def size (self ) -> int :
125+ return self ._current_context .size ()
126+
127+ @check_context
128+ def reset (self ) -> None :
129+ self ._current_context .reset ()
130+
131+ @check_context
132+ def emit (self , * args ) -> None :
133+ self ._current_context .emit (* args )
134+
135+
136+ def get_scalar_input (inp : Any ) -> Iterable [Tuple [Any , ...]]:
137+ """
138+ Figures out if the SCALAR parameters are provided as a scalar value or a tuple
139+ and also if there is a wrapping container around.
140+ Unless the parameters are already in a wrapping container returns parameters as a tuple wrapped
141+ into a one-item list, e.g [(param1[, param2, ...)]. Otherwise, returns the original input.
142+
143+ :param inp: Input parameters.
144+ """
145+
146+ if isinstance (inp , Iterable ) and not isinstance (inp , str ):
147+ row1 = next (iter (inp ))
148+ if isinstance (row1 , Iterable ) and not isinstance (row1 , str ):
149+ return inp
150+ else :
151+ return [inp ]
152+ else :
153+ return [(inp ,)]
154+
155+
156+ class StandaloneMockContext (UDFContext ):
157+ """
158+ Implementation of generic UDF Mock Context interface a SCALAR UDF or a SET UDF with no groups.
159+
160+ For Emit UDFs the output in the form of the list of tuples can be
161+ accessed by reading the `output` property.
162+ """
163+
164+ def __init__ (self , inp : Any , metadata : MockMetaData ):
165+ """
166+ :param inp: Input rows for a SET UDF or parameters for a SCALAR one.
167+ In the former case the input object must be an iterable of rows. This, for example,
168+ can be a Group object. It must implement the __len__ method. Each data row must be
169+ an indexable container, e.g. a tuple.
170+ In the SCALAR case the input can be a scalar value, or tuple. This can also be wrapped
171+ in an iterable container, similar to the SET case.
172+
173+ :param metadata: The mock metadata object.
174+ """
175+ if metadata .input_type .upper () == 'SCALAR' :
176+ self ._input = get_scalar_input (inp )
177+ else :
178+ self ._input = inp
179+ self ._metadata = metadata
180+ self ._data : Optional [Any ] = None
181+ self ._iter : Optional [Iterator [Tuple [Any , ...]]] = None
182+ self ._name_position_map = \
183+ {column .name : position
184+ for position , column
185+ in enumerate (metadata .input_columns )}
186+ self ._output = []
187+ self .next (reset = True )
188+
189+ @property
190+ def output (self ) -> List [Tuple [Any , ...]]:
191+ """Emitted output so far"""
192+ return self ._output
193+
194+ @staticmethod
195+ def _is_positive_integer (value ):
55196 return value is not None and isinstance (value , int ) and value > 0
56197
57198 def get_dataframe (self , num_rows = 'all' , start_col = 0 ):
@@ -80,26 +221,26 @@ def get_dataframe(self, num_rows='all', start_col=0):
80221 return df
81222
82223 def __getattr__ (self , name ):
83- return self ._data [self ._name_position_map [name ]]
224+ return None if self . _data is None else self ._data [self ._name_position_map [name ]]
84225
85226 def next (self , reset : bool = False ):
86- if reset :
227+ if self . _iter is None or reset :
87228 self .reset ()
88229 else :
89230 try :
90231 new_data = next (self ._iter )
91232 self ._data = new_data
92- self . _validate_tuples (self ._data , self ._metadata .input_columns )
233+ validate_emit (self ._data , self ._metadata .input_columns )
93234 return True
94235 except StopIteration as e :
95236 self ._data = None
96237 return False
97238
98239 def size (self ):
99- return self ._len
240+ return len ( self ._input )
100241
101242 def reset (self ):
102- self ._iter = iter (self ._input_group )
243+ self ._iter = iter (self ._input )
103244 self .next ()
104245
105246 def emit (self , * args ):
@@ -108,13 +249,5 @@ def emit(self, *args):
108249 else :
109250 tuples = [args ]
110251 for row in tuples :
111- self ._validate_tuples (row , self ._metadata .output_columns )
112- self ._output_group_list .extend (tuples )
113- return
114-
115- def _validate_tuples (self , row : Tuple , columns : List [Column ]):
116- if len (row ) != len (columns ):
117- raise Exception (f"row { row } has not the same number of values as columns are defined" )
118- for i , column in enumerate (columns ):
119- if row [i ] is not None and not isinstance (row [i ], column .type ):
120- raise TypeError (f"Value { row [i ]} ({ type (row [i ])} ) at position { i } is not a { column .type } " )
252+ validate_emit (row , self ._metadata .output_columns )
253+ self ._output .extend (tuples )
0 commit comments