@@ -519,9 +519,6 @@ class IntrospectAndStoreDatabaseCommand(MetaCommand):
519519 # new datasource type supported.
520520 AVOID_SCHEMAS = set (('information_schema' , 'pg_catalog' , 'crdb_internal' ))
521521
522- # As expected in real kernels in Notable. Overridden in test suite.
523- JWT_PATHNAME = "/vault/secrets/jwt"
524-
525522 def run (self , invoked_as : str , args : List [str ]) -> None :
526523 """Drive introspecting whole database, POSTing results back to Gate for storage.
527524
@@ -542,10 +539,6 @@ def run(self, invoked_as: str, args: List[str]) -> None:
542539
543540 inspector = self .get_inspector ()
544541
545- # On successful completion, will tell Gate to delete any now-orphaned
546- # relations from prior introspections older than this timestamp.
547- introspection_started_at = datetime .utcnow ()
548-
549542 # This and delta() just for development timing figures. Could become yet another
550543 # timer context manager implementation.
551544 start = time .monotonic ()
@@ -565,14 +558,11 @@ def delta() -> float:
565558 relations_and_kinds = self .all_table_and_views (inspector )
566559 print (f'Discovered { len (relations_and_kinds )} relations in { delta ()} ' )
567560
568- auth_header = self .get_auth_header ()
569-
570- # JSON-able schema descriptions to tell Gate about.
571- message_queue : List [RelationStructureDescription ] = []
572-
573561 # Introspect each relation concurrently.
574562 # TODO: Take minimum concurrency as a param?
575- with ThreadPoolExecutor (max_workers = self .MAX_INTROSPECTION_THREADS ) as executor :
563+ with ThreadPoolExecutor (
564+ max_workers = self .MAX_INTROSPECTION_THREADS
565+ ) as executor , RelationStructureMessager (ds_id ) as messenger :
576566
577567 future_to_relation = {
578568 executor .submit (
@@ -584,40 +574,15 @@ def delta() -> float:
584574 for future in as_completed (future_to_relation ):
585575 schema_name , relation_name , kind = future_to_relation [future ]
586576 try :
587- message_queue . append (future .result ())
577+ messenger . queue_for_delivery (future .result ())
588578 except Exception as exc :
589579 print (f'Exception for { schema_name } .{ relation_name } : { exc } ' )
590580
591581 table_introspection_delta = delta ()
592582 print (
593- f'Done introspecting in { table_introspection_delta } , amortized { table_introspection_delta / len (relations_and_kinds )} s per relation'
583+ f'Done introspecting and messaging gate in { table_introspection_delta } , amortized { table_introspection_delta / len (relations_and_kinds )} s per relation'
594584 )
595585
596- # Now report each result back to gate, currently one at a time. Done completely
597- # after just for relative timing measurements.
598-
599- # TODO: Could do the messaging back to gate either one-at-a-time in the worker threads,
600- # or in bulkier chunks either in the main thread when Gate offers a bulk API,
601- # or could use httpx and an event loop, ...
602-
603- # But right now doing the messaging back to gate back here in the main thread just one
604- # request at a time.
605-
606- session = requests .Session ()
607- session .headers .update (auth_header )
608-
609- if message_queue :
610- for message in message_queue :
611- self .inform_gate_relation (session , ds_id , message )
612-
613- # Clear out any prior known relations which may not exist anymore in this datasource.
614- #
615- # We do this at the tail end of things, and not the beginning, so as to not eagerly delete
616- # prior known data if we happen to croak due to some unforseen exception while introspecting.
617- self .inform_gate_completed (session , ds_id , introspection_started_at )
618-
619- print (f'Done storing discovered table and view structures in { delta ()} ' )
620-
621586 # run() contract: return what to bind to the SQL cell variable name, and if display() needs
622587 # to be called on it. Nothing and nope!
623588 return (None , False )
@@ -873,11 +838,93 @@ def get_datasource_id(self) -> UUID:
873838 handle = self .conn .name
874839 return UUID (handle [1 :])
875840
876- def get_auth_header (self ):
877- """Set up the auth header dict for making requests directly to Gate"""
841+
842+ class RelationStructureMessager :
843+ """Context manager that collects the single-relation descriptions discovered
844+ within IntrospectAndStoreDatabaseCommand, buffers up to CAPACITY at a time, then
845+ POSTs the current collected group up to Gate in a single bulk POST (which accepts
846+ at most 10).
847+
848+ Upon context exit, be sure to POST any partial remainder, then tell gate
849+ we're all done introspecting, allowing it to delete any old now no longer
850+ existing structures from prior introspections.
851+
852+ Helper class simplifying IntrospectAndStoreDatabaseCommand.run().
853+ """
854+
855+ # Overridden in test suite.
856+ CAPACITY = 10
857+ # As expected in real kernels in Notable. Overridden in test suite.
858+ JWT_PATHNAME = "/vault/secrets/jwt"
859+
860+ _relations : List [RelationStructureDescription ]
861+ _session : requests .Session
862+ _datasource_id : UUID
863+ _partition_counter : int
864+ _started_at : datetime
865+
866+ def __init__ (self , datasource_id : UUID ):
867+ self ._datasource_id = datasource_id
868+
869+ # Set up HTTP session to message Gate about what is discovered.
870+ self ._session = requests .Session ()
878871 jwt = pathlib .Path (self .JWT_PATHNAME ).read_text ()
872+ self ._session .headers .update ({"Authorization" : f"Bearer { jwt } " })
873+
874+ def __enter__ (self ):
875+ self ._relations = []
876+ self ._started_at = datetime .utcnow ()
877+ self ._partition_counter = 1
878+
879+ return self
880+
881+ def queue_for_delivery (self , relation : RelationStructureDescription ):
882+ """Stash this discovered relation. If we have enough already
883+ to send up to Gate, then do so.
884+ """
885+ self ._relations .append (relation )
886+
887+ # Time for an intermediate flush?
888+ if len (self ._relations ) == self .CAPACITY :
889+ self ._message_gate ()
890+
891+ def __exit__ (self , exc_type , exc_value , exc_traceback ):
892+ self ._message_gate (completed_introspection = True )
893+
894+ def _message_gate (self , completed_introspection : bool = False ):
895+
896+ base_url = f"http://gate.default/api/v1/datasources/{ self ._datasource_id } /schema/relations"
897+
898+ if self ._relations :
899+ # Assemble buffered relation descriptions into a single bulk upload payload
900+ jsonable_message = [
901+ relation_description .dict () for relation_description in self ._relations
902+ ]
903+
904+ # Upload in a single message.
905+ resp = self ._session .post (
906+ base_url ,
907+ json = jsonable_message ,
908+ )
909+
910+ if resp .status_code == 204 :
911+ for relation_description in self ._relations :
912+ print (
913+ f'Stored structure of { relation_description .schema_name } .{ relation_description .relation_name } in partition { self ._partition_counter } '
914+ )
915+ else :
916+ print (
917+ f'Failed storing partition { self ._partition_counter } : { resp .status_code } , { resp .text } '
918+ )
919+
920+ # Prepare for next partition.
921+ self ._partition_counter += 1
922+ self ._relations = []
879923
880- return {"Authorization" : f"Bearer { jwt } " }
924+ if completed_introspection :
925+ # Message indicating all done through asking to clear out any stored relation structures
926+ # older than when we started.
927+ self ._session .delete (f"{ base_url } ?older_than={ self ._started_at .isoformat ()} " )
881928
882929
883930def constraints_dataframe (
0 commit comments