@@ -531,7 +531,8 @@ class IntrospectAndStoreDatabaseCommand(MetaCommand):
531531 def run (self , invoked_as : str , args : List [str ]) -> None :
532532 """Drive introspecting whole database, POSTing results back to Gate for storage.
533533
534- Not really intended for end-user interactive use.
534+ Not really intended for end-user interactive use. But being wired into an undocumented
535+ invocable command let us beta test things before E2E headless introspection was completed.
535536 """
536537
537538 try :
@@ -543,53 +544,57 @@ def run(self, invoked_as: str, args: List[str]) -> None:
543544 # Cannot continue, in that there's no place to store the results gate-side, since there's
544545 # no corresponding datasources row to relate to.
545546
547+ # This will not happen in modern in any headless introspections -- both Gate and Geas
548+ # protect against headlessly introspecting the legacy datasources.
549+
546550 print ('Cannot introspect into this resource.' , file = sys .stderr )
547551 return (None , False )
548552
549- inspector = self .get_inspector ()
553+ # RelationStructureMessager handles both:
554+ #
555+ # * Uploading batches of successfully discovered relations
556+ # * Catching any exception and reporting it back to gate. Will suppress the exception.
557+ #
558+ with RelationStructureMessager (ds_id ) as messenger :
559+ inspector = self .get_inspector ()
560+
561+ # This and delta() just for development timing figures. Could become yet another
562+ # timer context manager implementation.
563+ start = time .monotonic ()
564+
565+ def delta () -> float :
566+ """Record new timing section, kindof like a stopwatch lap timer.
567+ Returns the prior 'lap' time.
568+ """
569+ nonlocal start
570+
571+ now = time .monotonic ()
572+ ret = now - start
573+ start = now
574+
575+ return ret
576+
577+ relations_and_kinds = self .all_table_and_views (inspector )
578+ print (f'Discovered { len (relations_and_kinds )} relations in { delta ()} ' )
579+
580+ # Introspect each relation concurrently.
581+ # TODO: Take minimum concurrency as a param?
582+ with ThreadPoolExecutor (max_workers = self .MAX_INTROSPECTION_THREADS ) as executor :
583+ future_to_relation = {
584+ executor .submit (
585+ self .fully_introspect , inspector , schema_name , relation_name , kind
586+ ): (schema_name , relation_name , kind )
587+ for (schema_name , relation_name , kind ) in relations_and_kinds
588+ }
550589
551- # This and delta() just for development timing figures. Could become yet another
552- # timer context manager implementation.
553- start = time .monotonic ()
554-
555- def delta () -> float :
556- """Record new timing section, kindof like a stopwatch lap timer.
557- Returns the prior 'lap' time.
558- """
559- nonlocal start
560-
561- now = time .monotonic ()
562- ret = now - start
563- start = now
564-
565- return ret
566-
567- relations_and_kinds = self .all_table_and_views (inspector )
568- print (f'Discovered { len (relations_and_kinds )} relations in { delta ()} ' )
569-
570- # Introspect each relation concurrently.
571- # TODO: Take minimum concurrency as a param?
572- with ThreadPoolExecutor (
573- max_workers = self .MAX_INTROSPECTION_THREADS
574- ) as executor , RelationStructureMessager (ds_id ) as messenger :
575- future_to_relation = {
576- executor .submit (
577- self .fully_introspect , inspector , schema_name , relation_name , kind
578- ): (schema_name , relation_name , kind )
579- for (schema_name , relation_name , kind ) in relations_and_kinds
580- }
581-
582- for future in as_completed (future_to_relation ):
583- schema_name , relation_name , kind = future_to_relation [future ]
584- try :
590+ for future in as_completed (future_to_relation ):
591+ schema_name , relation_name , kind = future_to_relation [future ]
585592 messenger .queue_for_delivery (future .result ())
586- except Exception as exc :
587- print (f'Exception for { schema_name } .{ relation_name } : { exc } ' )
588593
589- table_introspection_delta = delta ()
590- print (
591- f'Done introspecting and messaging gate in { table_introspection_delta } , amortized { table_introspection_delta / len (relations_and_kinds )} s per relation'
592- )
594+ table_introspection_delta = delta ()
595+ print (
596+ f'Done introspecting and messaging gate in { table_introspection_delta } , amortized { table_introspection_delta / len (relations_and_kinds )} s per relation'
597+ )
593598
594599 # run() contract: return what to bind to the SQL cell variable name, and if display() needs
595600 # to be called on it. Nothing and nope!
@@ -870,12 +875,23 @@ def queue_for_delivery(self, relation: RelationStructureDescription):
870875
871876 # Time for an intermediate flush?
872877 if len (self ._relations ) == self .CAPACITY :
873- self ._message_gate ()
878+ self ._send_relations_to_gate ()
874879
875880 def __exit__ (self , exc_type , exc_value , exc_traceback ):
876- self ._message_gate (completed_introspection = True )
881+ if not exc_value :
882+ # Successful introspection completion.
883+ self ._send_relations_to_gate (completed_introspection = True )
877884
878- def _message_gate (self , completed_introspection : bool = False ):
885+ else :
886+ # Hit an exception. Report it back to gate.
887+ self ._send_error_to_gate (exc_value )
888+
889+ print (str (exc_value ), file = sys .stderr )
890+
891+ # Suppress the exception from being raised.
892+ return True
893+
894+ def _send_relations_to_gate (self , completed_introspection : bool = False ):
879895 base_url = f"http://gate.default/api/v1/datasources/{ self ._datasource_id } /schema/relations"
880896
881897 if self ._relations :
@@ -896,19 +912,35 @@ def _message_gate(self, completed_introspection: bool = False):
896912 f'Stored structure of { relation_description .schema_name } .{ relation_description .relation_name } in partition { self ._partition_counter } '
897913 )
898914 else :
899- print (
900- f'Failed storing partition { self ._partition_counter } : { resp .status_code } , { resp .text } '
901- )
915+ error_message = f'Failed storing partition { self ._partition_counter } : { resp .status_code } , { resp .text } '
916+ print (error_message , file = sys .stderr )
917+ # I guess let this kill us now? Arguable either way.
918+ raise Exception (error_message )
902919
903920 # Prepare for next partition.
904921 self ._partition_counter += 1
905922 self ._relations = []
906923
907924 if completed_introspection :
908925 # Message indicating all done through asking to clear out any stored relation structures
909- # older than when we started.
926+ # older than when we started. Curiously via DELETE, this signals the successful logical
927+ # end of the introspection lifecycle.
910928 self ._session .delete (f"{ base_url } ?older_than={ self ._started_at .isoformat ()} " )
911929
930+ def _send_error_to_gate (self , exception : Exception ):
931+ # Sigh. Something, anything bad happened. Report it back to Gate.
932+ error_message : str = make_introspection_error_human_presentable (exception )
933+
934+ jsonable_message = {'error' : error_message }
935+
936+ url = f"http://gate.default/api/v1/datasources/{ self ._datasource_id } /schema/introspection-error"
937+
938+ # Don't care about the response code. There's nothing we can do at this point.
939+ self ._session .post (
940+ url ,
941+ json = jsonable_message ,
942+ )
943+
912944
913945def constraints_dataframe (
914946 inspector : SchemaStrippingInspector , table_name : str , schema : Optional [str ]
@@ -1236,3 +1268,12 @@ def _raise_from_no_such_table(schema: str, relation_name: str):
12361268 else :
12371269 msg = f'Relation { relation_name } does not exist'
12381270 raise MetaCommandException (msg )
1271+
1272+
1273+ def make_introspection_error_human_presentable (exception : Exception ) -> str :
1274+ """Convert any exception encountered by introspection into database into a nice human presentable string."""
1275+
1276+ # Will ultiamtely become complex due to N kinds of errors x M different SQLA database dialects possibly reporting errors differently
1277+
1278+ # But to start with ...
1279+ return str (exception )
0 commit comments