feat(templates): add EntityDef and entity-driven orchestration to SqlMetadataExtractor#1298
feat(templates): add EntityDef and entity-driven orchestration to SqlMetadataExtractor#1298
Conversation
…MetadataExtractor
Introduces declarative entity definitions that replace hardcoded
asyncio.gather of 4 entities in SqlMetadataExtractor.run().
- EntityDef dataclass: name, sql, endpoint, phase, enabled, timeout
- Phased orchestration: entities grouped by phase, run concurrently
within phase, sequentially across phases
- _fetch_entity() dispatches to fetch_{name}() methods by convention
- Full backward compat: empty entities list falls back to default 4
- Adding a new entity type is one line in the entities list
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 79.7%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
📦 Trivy Secret Scan Results
Report SummaryCould not generate summary table (data length mismatch: 9 vs 8). Scan Result Detailsrequirements.txtuv.lock |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified FilesNo covered modified files...
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/1298 |
…ults The hardcoded field mapping silently dropped counts for custom entities (e.g. stages, streams). Now any result key matching an ExtractionOutput field gets populated automatically via model_fields introspection.
Replace manual asyncio.gather + run() override examples with the declarative EntityDef approach. Add EntityDef fields reference table and multi-phase Snowflake example.
Replace class-attribute SQL strings and super() delegation with EntityDef declarations and proper @task implementations using the actual BaseSQLClient API (load + run_query).
Code ReviewThis PR introduces Confidence Score: 4/5
Important Files Changed
Change FlowsequenceDiagram
participant Caller as Caller (Temporal)
participant SME as SqlMetadataExtractor.run()
participant GE as _get_entities()
participant FE as _fetch_entity()
participant Task as fetch_{name}() @task
Caller->>SME: ExtractionInput
SME->>GE: get enabled entities
GE-->>SME: list[EntityDef]
Note over SME: Group entities by phase
loop For each phase (sorted)
SME->>FE: asyncio.gather(*entities_in_phase)
FE->>Task: dispatch to fetch_{name}(task_input)
Task-->>FE: (result_key, count)
FE-->>SME: phase results
end
SME->>SME: upload_to_atlan()
SME-->>Caller: ExtractionOutput
Findings
|
Refactors SqlQueryExtractor to use the same entity-driven pattern as
SqlMetadataExtractor. The "queries" entity handles batching internally
(get_query_batches → fetch_queries per batch). Custom entities dispatch
to fetch_{name}() by convention.
Prepares for future Option A where queries can be declared as an entity
in SqlMetadataExtractor alongside metadata entities.
…tration Replace super() delegation with EntityDef declaration and proper @task implementations using the actual BaseSQLClient API.
Aryamanz29
left a comment
There was a problem hiding this comment.
Code Review: EntityDef and Entity-Driven Orchestration
Overview
Replaces hardcoded asyncio.gather of 4 entities in SqlMetadataExtractor.run() with a declarative EntityDef dataclass and phase-based orchestration. Also applies the same pattern to SqlQueryExtractor. Good design direction — reduces boilerplate for connectors with more than 4 entity types.
Strengths
EntityDefis clean — frozen dataclass, well-documented fields, sensible defaults- Phase grouping works well — concurrent within phase, sequential across phases
- Backward compatible — empty
entitieslist falls back to the default 4 - Good test coverage — 17 new tests for EntityDef,
_get_entities,_fetch_entity, phase grouping
Issues
1. Duplicated orchestration logic (Major)
SqlMetadataExtractor.run() and SqlQueryExtractor.run() have nearly identical phase-grouping and execution code:
phases: dict[int, list[EntityDef]] = {}
for entity in entities:
phases.setdefault(entity.phase, []).append(entity)
for phase_num in sorted(phases):
phase_results = await asyncio.gather(...)This should be extracted into a shared method on BaseMetadataExtractor or a standalone function to avoid drift between the two.
2. _fetch_entity has different signatures (Medium)
SqlMetadataExtractor._fetch_entity(self, entity, base_input)— 2 argsSqlQueryExtractor._fetch_entity(self, entity, base_input, workflow_args)— 3 args
The query extractor needs workflow_args for the batch loop, but this breaks the uniform interface. Consider passing workflow_args through the QueryExtractionInput model instead, so both _fetch_entity methods have the same signature.
3. depends_on is declared but not implemented (Minor)
EntityDef.depends_on is documented as "Not yet implemented in orchestration — reserved for future use." This is fine for now, but it means the phase-based grouping is the only ordering mechanism. Just ensure it's tracked for future work.
4. Dynamic **entity_counts in output construction (Minor)
output_fields = ExtractionOutput.model_fields
entity_counts = {k: v for k, v in results.items() if k in output_fields}
return ExtractionOutput(**entity_counts, ...)This silently drops result keys that don't match ExtractionOutput fields. If a connector defines EntityDef(name="stages"), the result stages_extracted won't appear in the output unless ExtractionOutput has that field. Consider logging a warning when result keys are dropped, or document that connectors must extend ExtractionOutput for custom entities.
5. EntityDef.sql and EntityDef.endpoint are unused (Minor)
These fields are declared on EntityDef but never read by the orchestration logic. The _fetch_entity method dispatches to fetch_{name}() regardless of whether sql or endpoint is set. Either:
- Wire them into the dispatch (e.g. auto-generate SQL fetch when
sqlis set) - Or remove them and let subclasses handle fetch strategy in their
fetch_*methods
If keeping them as documentation-only hints, note that in the docstring.
6. Missing error handling for individual entity failures
If one entity in a phase fails, asyncio.gather raises immediately and cancels sibling tasks. Consider return_exceptions=True to let other entities in the phase complete, then report the failures together.
Nitpicks
- The SQL example removes the
transformstep entirely — the guide should clarify thattransform_datais still available and called after all entities complete QueryExtractionOutputusestotal_queriesas result key butQueryExtractionOutput.model_fieldsmay not have that key — verify the mapping works
Verdict
Good direction. The duplicated orchestration code (issue #1) is the main thing to fix before merge.
…ispatch, de-dup orchestration - Rename EntityDef → ExtractableEntity (avoid Atlas typedef collision) - Replace `name` field with `task_name` for explicit method mapping - Remove unused fields: sql, endpoint, pagination, response_items_key - Extract shared `run_entity_phases()` to eliminate duplicated orchestration logic between SqlMetadataExtractor and SqlQueryExtractor - Unify `_fetch_entity` signature to `(entity, base_input)` across both extractors; query extractor computes workflow_args internally - Add `return_exceptions=True` to asyncio.gather so sibling entities complete before errors are raised - Log warnings for result keys that don't match output model fields - Update examples, tests (21 pass, +4 new), and docs
|
Suggest we hold on this briefly until most other |
|
Closing — labeled "do not merge". EntityDef feature is tracked separately. Branch preserved — reopen against main when ready. |
Summary
EntityDeffrozen dataclass for declarative entity definitions (name, sql, endpoint, phase, enabled, timeout, result_key, depends_on)asyncio.gatherof 4 entities inSqlMetadataExtractor.run()with phased orchestration — entities grouped by phase run concurrently within phase, sequentially across phases_fetch_entity()dispatches tofetch_{name}()methods by convention;_get_entities()falls back to default 4 (databases, schemas, tables, columns) whenentitiesis emptyBaseMetadataExtractorbase class,upload_to_atlanstep,rewraperror handlingTest plan
test_entity_registry.pycovering EntityDef,_get_entities,_fetch_entity, and phase groupingtest_sql_metadata_extractor.pytests pass🤖 Generated with Claude Code