diff --git a/tests/test_server/fixtures/factories/lineage.py b/tests/test_server/fixtures/factories/lineage.py index 08d6b2a8..eace57c4 100644 --- a/tests/test_server/fixtures/factories/lineage.py +++ b/tests/test_server/fixtures/factories/lineage.py @@ -5,19 +5,11 @@ import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession -from data_rentgen.db.models import DatasetColumnRelationType, DatasetSymlinkType, Job, RunStatus, User +from data_rentgen.db.models import DatasetColumnRelationType, DatasetSymlinkType, RunStatus, User from data_rentgen.db.models.output import OutputType from data_rentgen.utils.uuid import generate_static_uuid -from tests.test_server.fixtures.factories.dataset import create_dataset, make_symlink -from tests.test_server.fixtures.factories.input import create_input -from tests.test_server.fixtures.factories.job import create_job -from tests.test_server.fixtures.factories.job_type import create_job_type -from tests.test_server.fixtures.factories.location import create_location -from tests.test_server.fixtures.factories.operation import create_operation -from tests.test_server.fixtures.factories.output import create_output +from tests.test_server.fixtures.factories.lineage_builder import LineageBuilder from tests.test_server.fixtures.factories.relations import create_column_lineage, create_column_relation -from tests.test_server.fixtures.factories.run import create_run -from tests.test_server.fixtures.factories.schema import create_schema from tests.test_server.utils.delete import clean_db from tests.test_server.utils.lineage_result import LineageResult @@ -25,37 +17,47 @@ @pytest_asyncio.fixture() async def simple_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], - job: Job, user: User, ) -> AsyncGenerator[LineageResult, None]: - # Two independent operations, run twice: - # J1 -> R1 -> O1, D1 -> O1 -> D2 - # J1 -> R1 -> O2, D3 -> O2 -> D4 - # J1 -> R2 -> O3, D1 -> O3 -> D2 - # J1 -> R2 -> O4, D3 -> O4 -> D4 + """ + Two independent operations, run twice: + J1 -> R1 -> O1, D1 -> O1 -> D2 + J1 -> R1 -> O2, D3 -> O2 -> D4 + J1 -> R2 -> O3, D1 -> O3 -> D2 + J1 -> R2 -> O4, D3 -> O4 -> D4 + """ num_runs = 2 num_operations = 2 num_datasets = 4 - lineage = LineageResult(jobs=[job]) async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) + job_location = await builder.create_location(key="simple_lineage_job_location") + job_type = await builder.create_job_type(key="simple_lineage_job_type") + job = await builder.create_job( + key="simple_lineage_job", + location=job_location, + job_type=job_type, + ) created_at = datetime.now(tz=UTC) + for n in range(num_runs): - run = await create_run( - async_session, + run = await builder.create_run( + key=f"run_{n}", + job=job, run_kwargs={ "job_id": job.id, "created_at": created_at + timedelta(seconds=0.1 * n), "started_by_user_id": user.id, }, ) - lineage.runs.append(run) # Each run has 2 operations operations = [ - await create_operation( - async_session, + await builder.create_operation( + key=f"run_{n}_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2 * i), @@ -63,17 +65,28 @@ async def simple_lineage( ) for i in range(num_operations) ] - lineage.operations.extend(operations) - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) + dataset_locations = [ + await builder.create_location(key=f"run_{n}_dataset_location_{i}") for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"run_{n}_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] - schema = await create_schema(async_session) + schema = await builder.create_schema(key=f"run_{n}_schema") - inputs = [ - await create_input( - async_session, + [ + await builder.create_input( + key=f"run_{n}_input_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[2 * i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -85,11 +98,16 @@ async def simple_lineage( ) for i, operation in enumerate(operations) ] - lineage.inputs.extend(inputs) - outputs = [ - await create_output( - async_session, + [ + await builder.create_output( + key=f"run_{n}_output_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[2 * i + 1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -102,7 +120,9 @@ async def simple_lineage( ) for i, operation in enumerate(operations) ] - lineage.outputs.extend(outputs) + + lineage = builder.build() + lineage.jobs = [job] yield lineage @@ -113,37 +133,46 @@ async def simple_lineage( @pytest_asyncio.fixture() async def three_days_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], - job: Job, user: User, ) -> AsyncGenerator[LineageResult, None]: - # Several of J -> R -> O, connected via same pair of datasets: - # J0 -> R0 -> O0, D0 -> O0 -> D1 - # J0 -> R0 -> O1, D1 -> O1 -> D2 - # J1 -> R1 -> O2, D0 -> O2 -> D1 - # J1 -> R1 -> O3, D1 -> O3 -> D2 - # J2 -> R2 -> O4, D0 -> O4 -> D1 - # J2 -> R2 -> O5, D1 -> O5 -> D2 - # Runs are 1 day apart. - - lineage = LineageResult() - lineage.jobs.append(job) + """ + Several of J -> R -> O, connected via same pair of datasets: + J0 -> R0 -> O0, D0 -> O0 -> D1 + J0 -> R0 -> O1, D1 -> O1 -> D2 + J1 -> R1 -> O2, D0 -> O2 -> D1 + J1 -> R1 -> O3, D1 -> O3 -> D2 + J2 -> R2 -> O4, D0 -> O4 -> D1 + J2 -> R2 -> O5, D1 -> O5 -> D2 + Runs are 1 day apart. + """ + created_at = datetime.now(tz=UTC) async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) + job_location = await builder.create_location(key="three_days_lineage_job_location") + job_type = await builder.create_job_type(key="three_days_lineage_job_type") + job = await builder.create_job( + key="three_days_lineage_job", + location=job_location, + job_type=job_type, + ) + for day in range(3): - run = await create_run( - async_session, + run = await builder.create_run( + key=f"three_days_run_{day}", + job=job, run_kwargs={ "job_id": job.id, "created_at": created_at + timedelta(days=day), "started_by_user_id": user.id, }, ) - lineage.runs.append(run) operations = [ - await create_operation( - async_session, + await builder.create_operation( + key=f"three_days_run_{day}_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2 * i), @@ -151,17 +180,28 @@ async def three_days_lineage( ) for i in range(2) ] - lineage.operations.extend(operations) - dataset_locations = [await create_location(async_session) for _ in range(3)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) + dataset_locations = [ + await builder.create_location(key=f"three_days_run_{day}_dataset_location_{i}") for i in range(3) + ] + datasets = [ + await builder.create_dataset( + key=f"three_days_run_{day}_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] - schema = await create_schema(async_session) + schema = await builder.create_schema(key=f"three_days_run_{day}_schema") - inputs = [ - await create_input( - async_session, + [ + await builder.create_input( + key=f"three_days_run_{day}_input_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -173,11 +213,16 @@ async def three_days_lineage( ) for i, operation in enumerate(operations) ] - lineage.inputs.extend(inputs) - outputs = [ - await create_output( - async_session, + [ + await builder.create_output( + key=f"three_days_run_{day}_output_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i + 1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at + timedelta(seconds=0.1), "operation_id": operation.id, @@ -190,7 +235,9 @@ async def three_days_lineage( ) for i, operation in enumerate(operations) ] - lineage.outputs.extend(outputs) + + lineage = builder.build() + lineage.jobs = [job] yield lineage @@ -203,52 +250,65 @@ async def lineage_with_depth( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Three trees of J -> R -> O, connected via datasets: - # J1 -> R1 -> O1, D1 -> O1 -> D2 - # J2 -> R2 -> O2, D2 -> O2 -> D3 - # J3 -> R3 -> O3, D3 -> O3 -> D4 - # J4 -> R4 -> O4, D4 -> O4 -> D5 + """ + Three trees of J -> R -> O, connected via datasets: + J1 -> R1 -> O1, D1 -> O1 -> D2 + J2 -> R2 -> O2, D2 -> O2 -> D3 + J3 -> R3 -> O3, D3 -> O3 -> D4 + J4 -> R4 -> O4, D4 -> O4 -> D5 + """ num_datasets = 5 num_jobs = 4 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - - schema = await create_schema(async_session) + builder = LineageBuilder(async_session) + dataset_locations = [ + await builder.create_location(key=f"lineage_with_depth_dataset_location_{i}") for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"lineage_with_depth_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] + schema = await builder.create_schema(key="lineage_with_depth_schema") # Create a job, run and operation with IO datasets. for i in range(num_jobs): - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + job_location = await builder.create_location(key=f"lineage_with_depth_job_location_{i}") + job_type = await builder.create_job_type(key=f"lineage_with_depth_job_type_{i}") + job = await builder.create_job( + key=f"lineage_with_depth_job_{i}", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key=f"lineage_with_depth_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - lineage.runs.append(run) - - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key=f"lineage_with_depth_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2), }, ) - lineage.operations.append(operation) - - input_ = await create_input( - async_session, + await builder.create_input( + key=f"lineage_with_depth_input_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -258,10 +318,15 @@ async def lineage_with_depth( "schema_id": schema.id, }, ) - lineage.inputs.append(input_) - output = await create_output( - async_session, + await builder.create_output( + key=f"lineage_with_depth_output_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i + 1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -272,7 +337,8 @@ async def lineage_with_depth( "schema_id": schema.id, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -285,52 +351,65 @@ async def cyclic_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Two trees of J -> R -> O, forming a cycle: - # J1 -> R1 -> O1, D1 -> O1 -> D2 - # J2 -> R2 -> O2, D2 -> O2 -> D1 + """ + Two trees of J -> R -> O, forming a cycle: + J1 -> R1 -> O1, D1 -> O1 -> D2 + J2 -> R2 -> O2, D2 -> O2 -> D1 + """ num_datasets = 2 num_jobs = 2 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - - schema = await create_schema(async_session) + builder = LineageBuilder(async_session) + dataset_locations = [ + await builder.create_location(key=f"cyclic_lineage_dataset_location_{i}") for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"cyclic_lineage_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] + schema = await builder.create_schema(key="cyclic_lineage_schema") # Create a job, run and operation with IO datasets. for i in range(num_jobs): from_dataset, to_dataset = (datasets[0], datasets[1]) if i == 0 else (datasets[1], datasets[0]) - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + job_location = await builder.create_location(key=f"cyclic_lineage_job_location_{i}") + job_type = await builder.create_job_type(key=f"cyclic_lineage_job_type_{i}") + job = await builder.create_job( + key=f"cyclic_lineage_job_{i}", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key=f"cyclic_lineage_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - lineage.runs.append(run) - - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key=f"cyclic_lineage_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2), }, ) - lineage.operations.append(operation) - - input = await create_input( - async_session, + await builder.create_input( + key=f"cyclic_lineage_input_{i}", + operation=operation, + run=run, + job=job, + dataset=from_dataset, + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -340,10 +419,15 @@ async def cyclic_lineage( "schema_id": schema.id, }, ) - lineage.inputs.append(input) - output = await create_output( - async_session, + await builder.create_output( + key=f"cyclic_lineage_output_{i}", + operation=operation, + run=run, + job=job, + dataset=to_dataset, + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -354,7 +438,8 @@ async def cyclic_lineage( "schema_id": schema.id, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -367,45 +452,51 @@ async def self_referencing_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Example then table can be its own source: - # J1 -> R1 -> O1, D1 -> O1 -> D1 # reading duplicates and removing them + """ + Example then table can be its own source: + J1 -> R1 -> O1, D1 -> O1 -> D1 # reading duplicates and removing them + """ created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_location = await create_location(async_session) - dataset = await create_dataset(async_session, location_id=dataset_location.id) - lineage.datasets.append(dataset) - - schema = await create_schema(async_session) - - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + builder = LineageBuilder(async_session) + dataset_location = await builder.create_location(key="self_ref_dataset_location") + dataset = await builder.create_dataset(key="self_ref_dataset", location=dataset_location) + + schema = await builder.create_schema(key="self_ref_schema") + + job_location = await builder.create_location(key="self_ref_job_location") + job_type = await builder.create_job_type(key="self_ref_job_type") + job = await builder.create_job( + key="self_ref_job", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key="self_ref_run", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=5), }, ) - lineage.runs.append(run) - - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key="self_ref_operation", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=1), }, ) - lineage.operations.append(operation) - - input = await create_input( - async_session, + await builder.create_input( + key="self_ref_input", + operation=operation, + run=run, + job=job, + dataset=dataset, + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -415,10 +506,15 @@ async def self_referencing_lineage( "schema_id": schema.id, }, ) - lineage.inputs.append(input) - output = await create_output( - async_session, + await builder.create_output( + key="self_ref_output", + operation=operation, + run=run, + job=job, + dataset=dataset, + output_type=OutputType.OVERWRITE, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -429,7 +525,8 @@ async def self_referencing_lineage( "schema_id": schema.id, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -442,47 +539,60 @@ async def lineage_with_non_connected_operations( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Run interacted with 2 datasets, but in different operations: - # J1 -> R1 -> O1, D1 -> O1 # SELECT max(id) FROM table1 - # J1 -> R1 -> O2, O2 -> D2 # INSERT INTO table1 VALUES + """ + Run interacted with 2 datasets, but in different operations: + J1 -> R1 -> O1, D1 -> O1 # SELECT max(id) FROM table1 + J1 -> R1 -> O2, O2 -> D2 # INSERT INTO table1 VALUES + """ num_datasets = 2 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - - schema = await create_schema(async_session) - - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + builder = LineageBuilder(async_session) + dataset_locations = [ + await builder.create_location(key=f"non_connected_dataset_location_{i}") for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"non_connected_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] + schema = await builder.create_schema(key="non_connected_schema") + + job_location = await builder.create_location(key="non_connected_job_location") + job_type = await builder.create_job_type(key="non_connected_job_type") + job = await builder.create_job( + key="non_connected_job", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key="non_connected_run", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=10), }, ) - lineage.runs.append(run) - - operation1 = await create_operation( - async_session, + operation1 = await builder.create_operation( + key="non_connected_operation_1", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=1), }, ) - lineage.operations.append(operation1) - - input1 = await create_input( - async_session, + await builder.create_input( + key="non_connected_input_1", + operation=operation1, + run=run, + job=job, + dataset=datasets[0], + schema=schema, input_kwargs={ "created_at": operation1.created_at, "operation_id": operation1.id, @@ -492,19 +602,23 @@ async def lineage_with_non_connected_operations( "schema_id": schema.id, }, ) - lineage.inputs.append(input1) - operation2 = await create_operation( - async_session, + operation2 = await builder.create_operation( + key="non_connected_operation_2", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=2), }, ) - lineage.operations.append(operation2) - - output2 = await create_output( - async_session, + await builder.create_output( + key="non_connected_output_2", + operation=operation2, + run=run, + job=job, + dataset=datasets[1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation2.created_at, "operation_id": operation2.id, @@ -515,7 +629,8 @@ async def lineage_with_non_connected_operations( "schema_id": schema.id, }, ) - lineage.outputs.append(output2) + + lineage = builder.build() yield lineage @@ -528,15 +643,17 @@ async def duplicated_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Two trees of J -> R -> O, interacting with the same dataset multiple times: - # J0 -> R0 -> O0, D0 -> O0 -> D1 - # J0 -> R0 -> O1, D0 -> O1 -> D1 - # J0 -> R1 -> O2, D0 -> O2 -> D1 - # J0 -> R1 -> O3, D0 -> O3 -> D1 - # J1 -> R2 -> O4, D0 -> O4 -> D1 - # J1 -> R2 -> O5, D0 -> O5 -> D1 - # J1 -> R3 -> O6, D0 -> O6 -> D1 - # J1 -> R3 -> O7, D0 -> O7 -> D1 + """ + Two trees of J -> R -> O, interacting with the same dataset multiple times: + J0 -> R0 -> O0, D0 -> O0 -> D1 + J0 -> R0 -> O1, D0 -> O1 -> D1 + J0 -> R1 -> O2, D0 -> O2 -> D1 + J0 -> R1 -> O3, D0 -> O3 -> D1 + J1 -> R2 -> O4, D0 -> O4 -> D1 + J1 -> R2 -> O5, D0 -> O5 -> D1 + J1 -> R3 -> O6, D0 -> O6 -> D1 + J1 -> R3 -> O7, D0 -> O7 -> D1 + """ num_datasets = 2 num_jobs = 2 @@ -544,50 +661,61 @@ async def duplicated_lineage( operations_per_run = 2 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - - schema = await create_schema(async_session) + builder = LineageBuilder(async_session) + dataset_locations = [ + await builder.create_location(key=f"duplicated_lineage_dataset_location_{i}") for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"duplicated_lineage_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) + ] + schema = await builder.create_schema(key="duplicated_lineage_schema") # Create a job, run and operation with IO datasets. for i in range(num_jobs): - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - + job_location = await builder.create_location(key=f"duplicated_lineage_job_location_{i}") + job_type = await builder.create_job_type(key=f"duplicated_lineage_job_type_{i}") + job = await builder.create_job( + key=f"duplicated_lineage_job_{i}", + location=job_location, + job_type=job_type, + ) runs = [ - await create_run( - async_session, + await builder.create_run( + key=f"duplicated_lineage_job_{i}_run_{run_idx}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - for _ in range(runs_per_job) + for run_idx in range(runs_per_job) ] - lineage.runs.extend(runs) - operations = [ - await create_operation( - async_session, + await builder.create_operation( + key=f"duplicated_lineage_job_{i}_run_{run_idx}_operation_{operation_idx}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2), }, ) - for run in runs - for _ in range(operations_per_run) + for run_idx, run in enumerate(runs) + for operation_idx in range(operations_per_run) ] - lineage.operations.extend(operations) - - inputs = [ - await create_input( - async_session, + [ + await builder.create_input( + key=f"duplicated_lineage_job_{i}_input_{op_idx}", + operation=operation, + run=next(run for run in runs if run.id == operation.run_id), + job=job, + dataset=datasets[0], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -597,13 +725,18 @@ async def duplicated_lineage( "schema_id": schema.id, }, ) - for operation in operations + for op_idx, operation in enumerate(operations) ] - lineage.inputs.extend(inputs) - outputs = [ - await create_output( - async_session, + [ + await builder.create_output( + key=f"duplicated_lineage_job_{i}_output_{op_idx}", + operation=operation, + run=next(run for run in runs if run.id == operation.run_id), + job=job, + dataset=datasets[1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -614,9 +747,10 @@ async def duplicated_lineage( "schema_id": schema.id, }, ) - for operation in operations + for op_idx, operation in enumerate(operations) ] - lineage.outputs.extend(outputs) + + lineage = builder.build() yield lineage @@ -629,50 +763,55 @@ async def branchy_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ): - # Three trees of J -> R -> O, connected via D3 and D6, but having other inputs & outputs: - # D0 D1 - # \ / - # J0 -> R0 -> O0 -> D2 - # \ - # D3 D4 - # \ / - # J1 -> R1 -> O1 -> D5 - # \ - # D6 D7 - # \ / - # J2 -> R2 -> O2 -> D8 - # \ - # D9 + """ + Three trees of J -> R -> O, connected via D3 and D6, but having other inputs & outputs: + D0 D1 + \\ / + J0 -> R0 -> O0 -> D2 + \ + D3 D4 + \\ / + J1 -> R1 -> O1 -> D5 + \ + D6 D7 + \\ / + J2 -> R2 -> O2 -> D8 + \ + D9 + """ num_datasets = 10 num_jobs = 3 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_locations = [await create_location(async_session) for _ in range(num_datasets)] + builder = LineageBuilder(async_session) + dataset_locations = [ + await builder.create_location(key=f"branchy_dataset_location_{i}") for i in range(num_datasets) + ] datasets = [ - await create_dataset(async_session, location_id=location.id, dataset_kwargs={"name": f"dataset_{i}"}) + await builder.create_dataset( + key=f"branchy_dataset_{i}", + location=location, + dataset_kwargs={"name": f"dataset_{i}"}, + ) for i, location in enumerate(dataset_locations) ] - lineage.datasets.extend(datasets) - - job_locations = [await create_location(async_session) for _ in range(num_jobs)] - job_type = await create_job_type(async_session) + job_locations = [await builder.create_location(key=f"branchy_job_location_{i}") for i in range(num_jobs)] + job_type = await builder.create_job_type(key="branchy_job_type") jobs = [ - await create_job( - async_session, - location_id=job_location.id, - job_type_id=job_type.id, + await builder.create_job( + key=f"branchy_job_{i}", + location=job_location, + job_type=job_type, job_kwargs={"name": f"job_{i}"}, ) for i, job_location in enumerate(job_locations) ] - lineage.jobs.extend(jobs) - runs = [ - await create_run( - async_session, + await builder.create_run( + key=f"branchy_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, @@ -682,11 +821,10 @@ async def branchy_lineage( ) for i, job in enumerate(jobs) ] - lineage.runs.extend(runs) - operations = [ - await create_operation( - async_session, + await builder.create_operation( + key=f"branchy_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2), @@ -695,13 +833,16 @@ async def branchy_lineage( ) for i, run in enumerate(runs) ] - lineage.operations.extend(operations) - - schema = await create_schema(async_session) - - inputs = [ - await create_input( - async_session, + schema = await builder.create_schema(key="branchy_schema") + + [ + await builder.create_input( + key=f"branchy_input_{i}_0", + operation=operation, + run=run, + job=job, + dataset=datasets[3 * i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -713,8 +854,13 @@ async def branchy_lineage( ) for i, (operation, run, job) in enumerate(zip(operations, runs, jobs, strict=False)) ] + [ - await create_input( - async_session, + await builder.create_input( + key=f"branchy_input_{i}_1", + operation=operation, + run=run, + job=job, + dataset=datasets[3 * i + 1], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -726,11 +872,16 @@ async def branchy_lineage( ) for i, (operation, run, job) in enumerate(zip(operations, runs, jobs, strict=False)) ] - lineage.inputs.extend(inputs) - outputs = [ - await create_output( - async_session, + [ + await builder.create_output( + key=f"branchy_output_{i}_0", + operation=operation, + run=run, + job=job, + dataset=datasets[3 * i + 2], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -743,8 +894,14 @@ async def branchy_lineage( ) for i, (operation, run, job) in enumerate(zip(operations, runs, jobs, strict=False)) ] + [ - await create_output( - async_session, + await builder.create_output( + key=f"branchy_output_{i}_1", + operation=operation, + run=run, + job=job, + dataset=datasets[3 * i + 3], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -757,7 +914,8 @@ async def branchy_lineage( ) for i, (operation, run, job) in enumerate(zip(operations, runs, jobs, strict=False)) ] - lineage.outputs.extend(outputs) + + lineage = builder.build() yield lineage @@ -770,71 +928,100 @@ async def lineage_with_symlinks( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # Three trees of J -> R -> O, connected to datasets via symlinks: - # J1 -> R1 -> O1, D1 -> O1 -> D2S - # J2 -> R2 -> O2, D2 -> O2 -> D3S - # J3 -> R3 -> O3, D3 -> O2 -> D4S + """ + Three trees of J -> R -> O, connected to datasets via symlinks: + J1 -> R1 -> O1, D1 -> O1 -> D2S + J2 -> R2 -> O2, D2 -> O2 -> D3S + J3 -> R3 -> O3, D3 -> O2 -> D4S - # TODO: This fixture create a different structure. (D1 -> O1 -> D1S). It must be fixed ! + TODO: This fixture creates a different structure. (D1 -> O1 -> D1S). It must be fixed. + """ - lineage = LineageResult() created_at = datetime.now(tz=UTC) num_datasets = 4 num_jobs = 3 async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) dataset_locations = [ - await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(num_datasets) + await builder.create_location( + key=f"lineage_with_symlinks_dataset_location_{i}", + location_kwargs={"type": "hdfs"}, + ) + for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"lineage_with_symlinks_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) ] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - symlink_locations = [ - await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(num_datasets) + await builder.create_location( + key=f"lineage_with_symlinks_symlink_location_{i}", + location_kwargs={"type": "hive"}, + ) + for i in range(num_datasets) ] symlink_datasets = [ - await create_dataset(async_session, location_id=location.id) for location in symlink_locations + await builder.create_dataset( + key=f"lineage_with_symlinks_symlink_dataset_{i}", + location=location, + ) + for i, location in enumerate(symlink_locations) ] - lineage.datasets.extend(symlink_datasets) - # Make symlinks - for dataset, symlink_dataset in zip(datasets, symlink_datasets, strict=False): - metastore = [await make_symlink(async_session, dataset, symlink_dataset, DatasetSymlinkType.METASTORE)] - lineage.dataset_symlinks.extend(metastore) + for i, (dataset, symlink_dataset) in enumerate(zip(datasets, symlink_datasets, strict=False)): + await builder.create_dataset_symlink( + key=f"lineage_with_symlinks_metastore_{i}", + from_dataset=dataset, + to_dataset=symlink_dataset, + type=DatasetSymlinkType.METASTORE, + ) - warehouse = [await make_symlink(async_session, symlink_dataset, dataset, DatasetSymlinkType.WAREHOUSE)] - lineage.dataset_symlinks.extend(warehouse) + await builder.create_dataset_symlink( + key=f"lineage_with_symlinks_warehouse_{i}", + from_dataset=symlink_dataset, + to_dataset=dataset, + type=DatasetSymlinkType.WAREHOUSE, + ) - schema = await create_schema(async_session) + schema = await builder.create_schema(key="lineage_with_symlinks_schema") # Make graphs for i in range(num_jobs): - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + job_location = await builder.create_location(key=f"lineage_with_symlinks_job_location_{i}") + job_type = await builder.create_job_type(key=f"lineage_with_symlinks_job_type_{i}") + job = await builder.create_job( + key=f"lineage_with_symlinks_job_{i}", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key=f"lineage_with_symlinks_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - lineage.runs.append(run) - - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key=f"lineage_with_symlinks_operation_{i}", + run=run, operation_kwargs={ "created_at": run.created_at + timedelta(seconds=0.2), "run_id": run.id, }, ) - lineage.operations.append(operation) - - input = await create_input( - async_session, + await builder.create_input( + key=f"lineage_with_symlinks_input_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -844,9 +1031,14 @@ async def lineage_with_symlinks( "schema_id": schema.id, }, ) - lineage.inputs.append(input) - output = await create_output( - async_session, + await builder.create_output( + key=f"lineage_with_symlinks_output_{i}", + operation=operation, + run=run, + job=job, + dataset=symlink_datasets[i], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -857,7 +1049,8 @@ async def lineage_with_symlinks( "schema_id": schema.id, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -870,69 +1063,98 @@ async def lineage_with_symlinks_dataset_granularity( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # Three trees of J -> R -> O, connected to datasets via symlinks: - # J1 -> R1 -> O1, D1 -> O1 -> D2S - # J2 -> R2 -> O2, D2 -> O2 -> D3S - # J3 -> R3 -> O3, D3 -> O2 -> D4S + """ + Three trees of J -> R -> O, connected to datasets via symlinks: + J1 -> R1 -> O1, D1 -> O1 -> D2S + J2 -> R2 -> O2, D2 -> O2 -> D3S + J3 -> R3 -> O3, D3 -> O2 -> D4S + """ - lineage = LineageResult() created_at = datetime.now(tz=UTC) num_datasets = 4 num_jobs = 3 async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) dataset_locations = [ - await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(num_datasets) + await builder.create_location( + key=f"lineage_with_symlinks_granularity_dataset_location_{i}", + location_kwargs={"type": "hdfs"}, + ) + for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"lineage_with_symlinks_granularity_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) ] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - symlink_locations = [ - await create_location(async_session, location_kwargs={"type": "hive"}) for _ in range(num_datasets) + await builder.create_location( + key=f"lineage_with_symlinks_granularity_symlink_location_{i}", + location_kwargs={"type": "hive"}, + ) + for i in range(num_datasets) ] symlink_datasets = [ - await create_dataset(async_session, location_id=location.id) for location in symlink_locations + await builder.create_dataset( + key=f"lineage_with_symlinks_granularity_symlink_dataset_{i}", + location=location, + ) + for i, location in enumerate(symlink_locations) ] - lineage.datasets.extend(symlink_datasets) - # Make symlinks - for dataset, symlink_dataset in zip(datasets, symlink_datasets, strict=False): - metastore = [await make_symlink(async_session, dataset, symlink_dataset, DatasetSymlinkType.METASTORE)] - lineage.dataset_symlinks.extend(metastore) + for i, (dataset, symlink_dataset) in enumerate(zip(datasets, symlink_datasets, strict=False)): + await builder.create_dataset_symlink( + key=f"lineage_with_symlinks_granularity_metastore_{i}", + from_dataset=dataset, + to_dataset=symlink_dataset, + type=DatasetSymlinkType.METASTORE, + ) - warehouse = [await make_symlink(async_session, symlink_dataset, dataset, DatasetSymlinkType.WAREHOUSE)] - lineage.dataset_symlinks.extend(warehouse) + await builder.create_dataset_symlink( + key=f"lineage_with_symlinks_granularity_warehouse_{i}", + from_dataset=symlink_dataset, + to_dataset=dataset, + type=DatasetSymlinkType.WAREHOUSE, + ) - schema = await create_schema(async_session) + schema = await builder.create_schema(key="lineage_with_symlinks_granularity_schema") # Make graphs for i in range(num_jobs): - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) - - run = await create_run( - async_session, + job_location = await builder.create_location(key=f"lineage_with_symlinks_granularity_job_location_{i}") + job_type = await builder.create_job_type(key=f"lineage_with_symlinks_granularity_job_type_{i}") + job = await builder.create_job( + key=f"lineage_with_symlinks_granularity_job_{i}", + location=job_location, + job_type=job_type, + ) + run = await builder.create_run( + key=f"lineage_with_symlinks_granularity_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - lineage.runs.append(run) - - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key=f"lineage_with_symlinks_granularity_operation_{i}", + run=run, operation_kwargs={ "created_at": run.created_at + timedelta(seconds=0.2), "run_id": run.id, }, ) - lineage.operations.append(operation) - - input = await create_input( - async_session, + await builder.create_input( + key=f"lineage_with_symlinks_granularity_input_{i}", + operation=operation, + run=run, + job=job, + dataset=datasets[i], + schema=schema, input_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -942,9 +1164,14 @@ async def lineage_with_symlinks_dataset_granularity( "schema_id": schema.id, }, ) - lineage.inputs.append(input) - output = await create_output( - async_session, + await builder.create_output( + key=f"lineage_with_symlinks_granularity_output_{i}", + operation=operation, + run=run, + job=job, + dataset=symlink_datasets[i + 1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -955,7 +1182,8 @@ async def lineage_with_symlinks_dataset_granularity( "schema_id": schema.id, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -968,23 +1196,40 @@ async def lineage_with_unconnected_symlinks( lineage_with_depth: LineageResult, async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], ) -> AsyncGenerator[LineageResult, None]: - # Same as lineage_with_depth, but each dataset has also a symlink, - # not connected to any input or output. + """ + Same as lineage_with_depth, but each dataset has also a symlink, + not connected to any input or output. + """ lineage = lineage_with_depth async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) existing_datasets = lineage.datasets.copy() - for dataset in existing_datasets: - another_location = await create_location(async_session) - another_dataset = await create_dataset(async_session, location_id=another_location.id) - lineage.datasets.append(another_dataset) + for i, dataset in enumerate(existing_datasets): + another_location = await builder.create_location(key=f"unconnected_symlink_location_{i}") + another_dataset = await builder.create_dataset( + key=f"unconnected_symlink_dataset_{i}", + location=another_location, + ) + + await builder.create_dataset_symlink( + key=f"unconnected_symlink_metastore_{i}", + from_dataset=another_dataset, + to_dataset=dataset, + type=DatasetSymlinkType.METASTORE, + ) - metastore = [await make_symlink(async_session, another_dataset, dataset, DatasetSymlinkType.METASTORE)] - lineage.dataset_symlinks.extend(metastore) + await builder.create_dataset_symlink( + key=f"unconnected_symlink_warehouse_{i}", + from_dataset=dataset, + to_dataset=another_dataset, + type=DatasetSymlinkType.WAREHOUSE, + ) - warehouse = [await make_symlink(async_session, dataset, another_dataset, DatasetSymlinkType.WAREHOUSE)] - lineage.dataset_symlinks.extend(warehouse) + lineage_build = builder.build() + lineage.datasets.extend(lineage_build.datasets) + lineage.dataset_symlinks.extend(lineage_build.dataset_symlinks) yield lineage @@ -997,20 +1242,22 @@ async def duplicated_lineage_with_column_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], duplicated_lineage: LineageResult, ) -> AsyncGenerator[LineageResult, None]: - # At this fixture we add column lineage to check relation types aggregation on different levels. - # O0 will have two direct and indirect (IDENTITY, TRANSFORMATION and FILTER, JOIN) relations for same source-target columns. - # O1 will have same source-target column as O0 but another relations type(TRANSFORMATION_MASKING and GROUP_BY). - # O2 will have same source-target column as O0 and O1 but another relations type(AGGREGATION and SORT). - - # Two trees of J -> R -> O, interacting with the same dataset multiple times: - # J0 -> R0 -> O0, D0 -> O0 -> D1 - # J0 -> R0 -> O1, D0 -> O1 -> D1 - # J0 -> R1 -> O2, D0 -> O2 -> D1 - # J0 -> R1 -> O3, D0 -> O3 -> D1 - # J1 -> R2 -> O4, D0 -> O4 -> D1 - # J1 -> R2 -> O5, D0 -> O5 -> D1 - # J1 -> R3 -> O6, D0 -> O6 -> D1 - # J1 -> R3 -> O7, D0 -> O7 -> D1 + """ + Add column lineage to check relation type aggregation on different levels. + O0 has direct and indirect relations for the same source-target columns. + O1 has the same source-target columns as O0 but different relation types. + O2 has the same source-target columns as O0/O1 but different relation types. + + Two trees of J -> R -> O, interacting with the same dataset multiple times: + J0 -> R0 -> O0, D0 -> O0 -> D1 + J0 -> R0 -> O1, D0 -> O1 -> D1 + J0 -> R1 -> O2, D0 -> O2 -> D1 + J0 -> R1 -> O3, D0 -> O3 -> D1 + J1 -> R2 -> O4, D0 -> O4 -> D1 + J1 -> R2 -> O5, D0 -> O5 -> D1 + J1 -> R3 -> O6, D0 -> O6 -> D1 + J1 -> R3 -> O7, D0 -> O7 -> D1 + """ operation_relations_matrix = ( (0, 0, DatasetColumnRelationType.IDENTITY, DatasetColumnRelationType.FILTER), (0, 0, DatasetColumnRelationType.TRANSFORMATION, DatasetColumnRelationType.JOIN), @@ -1021,10 +1268,11 @@ async def duplicated_lineage_with_column_lineage( lineage = duplicated_lineage async with async_session_maker() as async_session: for operation, run, direct_type, indirect_type in operation_relations_matrix: + fingerprint = generate_static_uuid(direct_type.name + indirect_type.name) # Direct await create_column_relation( async_session, - fingerprint=generate_static_uuid(direct_type.name + indirect_type.name), + fingerprint=fingerprint, column_relation_kwargs={ "type": direct_type.value, "source_column": "direct_source_column", @@ -1034,7 +1282,7 @@ async def duplicated_lineage_with_column_lineage( # Indirect await create_column_relation( async_session, - fingerprint=generate_static_uuid(direct_type.name + indirect_type.name), + fingerprint=fingerprint, column_relation_kwargs={ "type": indirect_type.value, "source_column": "indirect_source_column", @@ -1045,12 +1293,12 @@ async def duplicated_lineage_with_column_lineage( async_session, column_lineage_kwargs={ "created_at": lineage.operations[operation].created_at, - "operation_id": lineage.operations[operation].id, - "run_id": lineage.runs[run].id, - "job_id": lineage.jobs[0].id, - "source_dataset_id": lineage.datasets[0].id, - "target_dataset_id": lineage.datasets[1].id, - "fingerprint": generate_static_uuid(direct_type.name + indirect_type.name), + "operation": lineage.operations[operation], + "run": lineage.runs[run], + "job": lineage.jobs[0], + "source_dataset": lineage.datasets[0], + "target_dataset": lineage.datasets[1], + "fingerprint": fingerprint, }, ) @@ -1065,13 +1313,15 @@ async def lineage_with_depth_and_with_column_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], lineage_with_depth: LineageResult, ) -> AsyncGenerator[LineageResult, None]: - # Three trees of J -> R -> O, connected via datasets: - # J1 -> R1 -> O1, D1 -> O1 -> D2 - # J2 -> R2 -> O2, D2 -> O2 -> D3 - # J3 -> R3 -> O3, D3 -> O3 -> D4 + """ + Three trees of J -> R -> O, connected via datasets: + J1 -> R1 -> O1, D1 -> O1 -> D2 + J2 -> R2 -> O2, D2 -> O2 -> D3 + J3 -> R3 -> O3, D3 -> O3 -> D4 - # Each Operation will have same column lineage. - # So we can test not only depths but also same lineage for different operations, runs and jobs + Each operation has the same column lineage, so we can test both depth and + repeated lineage across different operations, runs, and jobs. + """ lineage = lineage_with_depth async with async_session_maker() as async_session: @@ -1101,11 +1351,11 @@ async def lineage_with_depth_and_with_column_lineage( async_session, column_lineage_kwargs={ "created_at": lineage.operations[i].created_at, - "operation_id": lineage.operations[i].id, - "run_id": lineage.runs[i].id, - "job_id": lineage.jobs[i].id, - "source_dataset_id": lineage.datasets[i].id, - "target_dataset_id": lineage.datasets[i + 1].id, + "operation": lineage.operations[i], + "run": lineage.runs[i], + "job": lineage.jobs[i], + "source_dataset": lineage.datasets[i], + "target_dataset": lineage.datasets[i + 1], "fingerprint": generate_static_uuid(f"job_{i}"), }, ) @@ -1121,53 +1371,64 @@ async def lineage_with_different_dataset_interactions( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # Tree J -> R -> O0...03, interacting with the same dataset multiple times with different operations types: - # J0 -> R0 -> O0, O0 -> D1 - # J0 -> R0 -> O1, O1 -> D1 - # J0 -> R1 -> O2, O2 -> D1 + """ + Tree J -> R -> O0...03, interacting with the same dataset multiple times + with different operation types: + J0 -> R0 -> O0, O0 -> D1 + J0 -> R0 -> O1, O1 -> D1 + J0 -> R1 -> O2, O2 -> D1 + """ operations_per_run = 3 created_at = datetime.now(tz=UTC) - lineage = LineageResult() async with async_session_maker() as async_session: - dataset_location = await create_location(async_session) - dataset = await create_dataset(async_session, location_id=dataset_location.id) - lineage.datasets.append(dataset) + builder = LineageBuilder(async_session) + dataset_location = await builder.create_location(key="different_dataset_interactions_dataset_location") + dataset = await builder.create_dataset(key="different_dataset_interactions_dataset", location=dataset_location) - schema = await create_schema(async_session) + schema = await builder.create_schema(key="different_dataset_interactions_schema") # Create a job, run and operation with IO datasets. - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) + job_location = await builder.create_location(key="different_dataset_interactions_job_location") + job_type = await builder.create_job_type(key="different_dataset_interactions_job_type") + job = await builder.create_job( + key="different_dataset_interactions_job", + location=job_location, + job_type=job_type, + ) - run = await create_run( - async_session, + run = await builder.create_run( + key="different_dataset_interactions_run", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=1), }, ) - lineage.runs.append(run) operations = [ - await create_operation( - async_session, + await builder.create_operation( + key=f"different_dataset_interactions_operation_{i}", + run=run, operation_kwargs={ "run_id": run.id, "created_at": run.created_at + timedelta(seconds=0.2), }, ) - for _ in range(operations_per_run) + for i in range(operations_per_run) ] - lineage.operations.extend(operations) - outputs = [ - await create_output( - async_session, + [ + await builder.create_output( + key=f"different_dataset_interactions_output_{i}", + operation=operation, + run=run, + job=job, + dataset=dataset, + output_type=type_, + schema=schema, output_kwargs={ "created_at": operation.created_at, "operation_id": operation.id, @@ -1178,13 +1439,16 @@ async def lineage_with_different_dataset_interactions( "schema_id": schema.id, }, ) - for operation, type_ in zip( - operations, - [OutputType.OVERWRITE, OutputType.TRUNCATE, OutputType.DROP], - strict=False, + for i, (operation, type_) in enumerate( + zip( + operations, + [OutputType.OVERWRITE, OutputType.TRUNCATE, OutputType.DROP], + strict=False, + ) ) ] - lineage.outputs.extend(outputs) + + lineage = builder.build() yield lineage @@ -1197,55 +1461,75 @@ async def lineage_for_long_running_operations( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # Three trees of J -> R -> O, but each operation+dataset produced multiple inputs/outputs: - # J1 -> R1 -> O1, D1 -> O1 -> D2 - # J2 -> R2 -> O2, D2 -> O2 -> D3 - # J3 -> R3 -> O3, D3 -> O2 -> D4 + """ + Three trees of J -> R -> O, but each operation+dataset produces multiple + inputs/outputs: + J1 -> R1 -> O1, D1 -> O1 -> D2 + J2 -> R2 -> O2, D2 -> O2 -> D3 + J3 -> R3 -> O3, D3 -> O2 -> D4 + """ - lineage = LineageResult() created_at = datetime.now(tz=UTC) num_datasets = 4 num_jobs = 3 num_io = 10 async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) dataset_locations = [ - await create_location(async_session, location_kwargs={"type": "hdfs"}) for _ in range(num_datasets) + await builder.create_location( + key=f"long_running_dataset_location_{i}", + location_kwargs={"type": "hdfs"}, + ) + for i in range(num_datasets) + ] + datasets = [ + await builder.create_dataset( + key=f"long_running_dataset_{i}", + location=location, + ) + for i, location in enumerate(dataset_locations) ] - datasets = [await create_dataset(async_session, location_id=location.id) for location in dataset_locations] - lineage.datasets.extend(datasets) - schema = await create_schema(async_session) + schema = await builder.create_schema(key="long_running_schema") # Make graphs for i in range(num_jobs): - job_location = await create_location(async_session) - job_type = await create_job_type(async_session) - job = await create_job(async_session, location_id=job_location.id, job_type_id=job_type.id) - lineage.jobs.append(job) + job_location = await builder.create_location(key=f"long_running_job_location_{i}") + job_type = await builder.create_job_type(key=f"long_running_job_type_{i}") + job = await builder.create_job( + key=f"long_running_job_{i}", + location=job_location, + job_type=job_type, + ) - run = await create_run( - async_session, + run = await builder.create_run( + key=f"long_running_run_{i}", + job=job, run_kwargs={ "job_id": job.id, "started_by_user_id": user.id, "created_at": created_at + timedelta(seconds=i), }, ) - lineage.runs.append(run) - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key=f"long_running_operation_{i}", + run=run, operation_kwargs={ "created_at": run.created_at + timedelta(seconds=0.2), "run_id": run.id, }, ) - lineage.operations.append(operation) for io in range(num_io): - input_ = await create_input( - async_session, + await builder.create_input( + key=f"long_running_input_{i}_{io}", + operation=operation, + run=run, + job=job, + dataset=datasets[i], + schema=schema, input_kwargs={ "created_at": operation.created_at + timedelta(hours=io), "operation_id": operation.id, @@ -1258,10 +1542,15 @@ async def lineage_for_long_running_operations( "num_bytes": io, }, ) - lineage.inputs.append(input_) - output = await create_output( - async_session, + await builder.create_output( + key=f"long_running_output_{i}_{io}", + operation=operation, + run=run, + job=job, + dataset=datasets[i + 1], + output_type=OutputType.APPEND, + schema=schema, output_kwargs={ "created_at": operation.created_at + timedelta(hours=io), "operation_id": operation.id, @@ -1275,7 +1564,8 @@ async def lineage_for_long_running_operations( "num_bytes": io, }, ) - lineage.outputs.append(output) + + lineage = builder.build() yield lineage @@ -1288,15 +1578,16 @@ async def lineage_for_long_running_operations_with_column_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], lineage_for_long_running_operations: LineageResult, ) -> AsyncGenerator[LineageResult, None]: - # Same as lineage_for_long_running_operations, but with column lineage + """Same as lineage_for_long_running_operations, but with column lineage.""" lineage = lineage_for_long_running_operations num_io = 3 async with async_session_maker() as async_session: + builder = LineageBuilder(async_session) for i in range(len(lineage.jobs)): # Direct - await create_column_relation( - async_session, + await builder.create_column_relation( + key=f"long_running_column_direct_{i}", fingerprint=generate_static_uuid(f"job_{i}"), column_relation_kwargs={ "type": DatasetColumnRelationType.AGGREGATION.value, @@ -1305,8 +1596,8 @@ async def lineage_for_long_running_operations_with_column_lineage( }, ) # Indirect - await create_column_relation( - async_session, + await builder.create_column_relation( + key=f"long_running_column_indirect_{i}", fingerprint=generate_static_uuid(f"job_{i}"), column_relation_kwargs={ "type": DatasetColumnRelationType.JOIN.value, @@ -1316,16 +1607,16 @@ async def lineage_for_long_running_operations_with_column_lineage( ) for io in range(num_io): - await create_column_lineage( - async_session, + await builder.create_column_lineage( + key=f"long_running_column_lineage_{i}_{io}", + operation=lineage.operations[i], + run=lineage.runs[i], + job=lineage.jobs[i], + source_dataset=lineage.datasets[i], + target_dataset=lineage.datasets[i + 1], + fingerprint=generate_static_uuid(f"job_{i}"), column_lineage_kwargs={ "created_at": lineage.operations[i].created_at + timedelta(hours=io), - "operation_id": lineage.operations[i].id, - "run_id": lineage.runs[i].id, - "job_id": lineage.jobs[i].id, - "source_dataset_id": lineage.datasets[i].id, - "target_dataset_id": lineage.datasets[i + 1].id, - "fingerprint": generate_static_uuid(f"job_{i}"), }, ) @@ -1340,45 +1631,61 @@ async def lineage_with_parent_run_relations( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], user: User, ) -> AsyncGenerator[LineageResult, None]: - # Lineage with chain of runs: Airflow Task -> Airflow Job -> Spark Job - # R0 -> R1 -> R2 - lineage = LineageResult() + """ + Lineage with chain of runs: Airflow Task -> Airflow Job -> Spark Job + R0 -> R1 -> R2 + """ created_at = datetime.now(tz=UTC) async with async_session_maker() as async_session: - dataset_location = await create_location(async_session, location_kwargs={"type": "hdfs"}) - dataset = await create_dataset(async_session, location_id=dataset_location.id) - lineage.datasets.extend([dataset]) + builder = LineageBuilder(async_session) + dataset_location = await builder.create_location( + key="parent_run_relations_dataset_location", + location_kwargs={"type": "hdfs"}, + ) + dataset = await builder.create_dataset( + key="parent_run_relations_dataset", + location=dataset_location, + ) - spark_location = await create_location(async_session) - airflow_location = await create_location(async_session) + spark_location = await builder.create_location(key="parent_run_relations_spark_location") + airflow_location = await builder.create_location(key="parent_run_relations_airflow_location") - spark_application_job_type = await create_job_type(async_session, job_type_kwargs={"type": "SPARK_APPLICATION"}) - airflow_dag_job_type = await create_job_type(async_session, job_type_kwargs={"type": "AIRFLOW_DAG"}) - airflow_task_job_type = await create_job_type(async_session, job_type_kwargs={"type": "AIRFLOW_TASK"}) + spark_application_job_type = await builder.create_job_type( + key="parent_run_relations_spark_job_type", + job_type_kwargs={"type": "SPARK_APPLICATION"}, + ) + airflow_dag_job_type = await builder.create_job_type( + key="parent_run_relations_airflow_dag_job_type", + job_type_kwargs={"type": "AIRFLOW_DAG"}, + ) + airflow_task_job_type = await builder.create_job_type( + key="parent_run_relations_airflow_task_job_type", + job_type_kwargs={"type": "AIRFLOW_TASK"}, + ) - airflow_dag = await create_job( - async_session, - location_id=airflow_location.id, - job_type_id=airflow_dag_job_type.id, + airflow_dag = await builder.create_job( + key="parent_run_relations_airflow_dag", + location=airflow_location, + job_type=airflow_dag_job_type, job_kwargs={"name": "airflow_dag_name"}, ) - airflow_task = await create_job( - async_session, - location_id=airflow_location.id, - job_type_id=airflow_task_job_type.id, + airflow_task = await builder.create_job( + key="parent_run_relations_airflow_task", + location=airflow_location, + job_type=airflow_task_job_type, job_kwargs={"name": "airflow_task_name", "parent_job_id": airflow_dag.id}, ) - spark_application = await create_job( - async_session, - location_id=spark_location.id, - job_type_id=spark_application_job_type.id, + spark_application = await builder.create_job( + key="parent_run_relations_spark_application", + location=spark_location, + job_type=spark_application_job_type, job_kwargs={"name": "spark_application_name", "parent_job_id": airflow_task.id}, ) - lineage.jobs = [airflow_dag, airflow_task, spark_application] - airflow_dag_run = await create_run( - async_session, + airflow_dag_run = await builder.create_run( + key="parent_run_relations_airflow_dag_run", + job=airflow_dag, run_kwargs={ "job_id": airflow_dag.id, "started_by_user_id": user.id, @@ -1389,8 +1696,9 @@ async def lineage_with_parent_run_relations( "ended_at": None, }, ) - airflow_task_run = await create_run( - async_session, + airflow_task_run = await builder.create_run( + key="parent_run_relations_airflow_task_run", + job=airflow_task, run_kwargs={ "job_id": airflow_task.id, "parent_run_id": airflow_dag_run.id, @@ -1401,8 +1709,9 @@ async def lineage_with_parent_run_relations( "ended_at": created_at + timedelta(seconds=240), }, ) - spark_app_run1 = await create_run( - async_session, + await builder.create_run( + key="parent_run_relations_spark_run_1", + job=spark_application, run_kwargs={ "job_id": spark_application.id, "parent_run_id": airflow_task_run.id, @@ -1413,8 +1722,9 @@ async def lineage_with_parent_run_relations( "ended_at": created_at + timedelta(seconds=60), }, ) - spark_app_run2 = await create_run( - async_session, + spark_app_run2 = await builder.create_run( + key="parent_run_relations_spark_run_2", + job=spark_application, run_kwargs={ "job_id": spark_application.id, "started_by_user_id": user.id, @@ -1425,19 +1735,22 @@ async def lineage_with_parent_run_relations( "ended_at": created_at + timedelta(seconds=120), }, ) - lineage.runs = [airflow_dag_run, airflow_task_run, spark_app_run1, spark_app_run2] - operation = await create_operation( - async_session, + operation = await builder.create_operation( + key="parent_run_relations_operation", + run=spark_app_run2, operation_kwargs={ "created_at": spark_app_run2.created_at + timedelta(seconds=0.2), "run_id": spark_app_run2.id, }, ) - lineage.operations.append(operation) - input_ = await create_input( - async_session, + await builder.create_input( + key="parent_run_relations_input", + operation=operation, + run=spark_app_run2, + job=spark_application, + dataset=dataset, input_kwargs={ "created_at": operation.created_at + timedelta(hours=1), "operation_id": operation.id, @@ -1446,7 +1759,8 @@ async def lineage_with_parent_run_relations( "dataset_id": dataset.id, }, ) - lineage.inputs.append(input_) + + lineage = builder.build() yield lineage diff --git a/tests/test_server/fixtures/factories/lineage_builder.py b/tests/test_server/fixtures/factories/lineage_builder.py new file mode 100644 index 00000000..381c2eb1 --- /dev/null +++ b/tests/test_server/fixtures/factories/lineage_builder.py @@ -0,0 +1,528 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from data_rentgen.db.models import ( + Address, + ColumnLineage, + Dataset, + DatasetColumnRelation, + DatasetSymlink, + DatasetSymlinkType, + Input, + Job, + JobDependency, + JobType, + Location, + Operation, + Output, + OutputType, + Run, + Schema, + Tag, + User, +) +from tests.test_server.fixtures.factories.address import create_address as create_address_model +from tests.test_server.fixtures.factories.dataset import create_dataset as create_dataset_model +from tests.test_server.fixtures.factories.dataset import make_symlink as make_dataset_symlink_model +from tests.test_server.fixtures.factories.input import create_input as create_input_model +from tests.test_server.fixtures.factories.job import create_job as create_job_model +from tests.test_server.fixtures.factories.job_type import create_job_type as create_job_type_model +from tests.test_server.fixtures.factories.location import create_location as create_location_model +from tests.test_server.fixtures.factories.operation import create_operation as create_operation_model +from tests.test_server.fixtures.factories.output import create_output as create_output_model +from tests.test_server.fixtures.factories.personal_token import create_personal_token as create_personal_token_model +from tests.test_server.fixtures.factories.relations import create_column_lineage as create_column_lineage_model +from tests.test_server.fixtures.factories.relations import create_column_relation as create_column_relation_model +from tests.test_server.fixtures.factories.run import create_run as create_run_model +from tests.test_server.fixtures.factories.schema import create_schema as create_schema_model +from tests.test_server.fixtures.factories.sql_query import create_sql_query as create_sql_query_model +from tests.test_server.fixtures.factories.tag import create_tag as create_tag_model +from tests.test_server.fixtures.factories.tag import create_tag_value as create_tag_value_model +from tests.test_server.fixtures.factories.user import create_user as create_user_model +from tests.test_server.utils.lineage_result import LineageResult + +if TYPE_CHECKING: + from uuid import UUID + + from sqlalchemy.ext.asyncio import AsyncSession + + from data_rentgen.db.models.personal_token import PersonalToken + from data_rentgen.db.models.sql_query import SQLQuery + from data_rentgen.db.models.tag_value import TagValue + + +class LineageBuilder: + def __init__(self, async_session: AsyncSession) -> None: + self.async_session = async_session + self.lineage_result = LineageResult() + + # Entity registries by user-defined keys for deterministic reuse in tests. + self.addresses: dict[str, Address] = {} + self.locations: dict[str, Location] = {} + self.schemas: dict[str, Schema] = {} + self.tags: dict[str, Tag] = {} + self.tag_values: dict[str, TagValue] = {} + self.datasets: dict[str, Dataset] = {} + self.job_types: dict[str, JobType] = {} + self.jobs: dict[str, Job] = {} + self.users: dict[str, User] = {} + self.runs: dict[str, Run] = {} + self.sql_queries: dict[str, SQLQuery] = {} + self.operations: dict[str, Operation] = {} + self.inputs: dict[str, Input] = {} + self.outputs: dict[str, Output] = {} + self.personal_tokens: dict[str, PersonalToken] = {} + + # Relation registries. + self.dataset_symlinks: dict[str, DatasetSymlink] = {} + self.job_dependencies: dict[str, JobDependency] = {} + self.column_lineage: dict[str, ColumnLineage] = {} + self.column_relations: dict[str, DatasetColumnRelation] = {} + + # Optional reverse index useful for debugging and ad-hoc assertions. + self.by_id: dict[str, object] = {} + + async def create_location( + self, + key: str, + location_kwargs: dict | None = None, + address_kwargs: dict | None = None, + ) -> Location: + if key in self.locations: + return self.locations[key] + + location = await create_location_model( + async_session=self.async_session, + location_kwargs=location_kwargs, + address_kwargs=address_kwargs, + ) + self.locations[key] = location + self.by_id[str(location.id)] = location + return location + + async def create_address( + self, + key: str, + location: Location, + address_kwargs: dict | None = None, + ) -> Address: + if key in self.addresses: + return self.addresses[key] + + payload = dict(address_kwargs or {}) + address = await create_address_model( + async_session=self.async_session, + location_id=location.id, + address_kwargs=payload, + ) + self.addresses[key] = address + self.by_id[str(address.id)] = address + return address + + async def create_schema( + self, + key: str, + schema_kwargs: dict | None = None, + ) -> Schema: + if key in self.schemas: + return self.schemas[key] + + payload = dict(schema_kwargs or {}) + schema = await create_schema_model( + async_session=self.async_session, + schema_kwargs=payload, + ) + self.schemas[key] = schema + self.by_id[str(schema.id)] = schema + return schema + + async def create_tag( + self, + key: str, + tag_kwargs: dict | None = None, + ) -> Tag: + if key in self.tags: + return self.tags[key] + + payload = dict(tag_kwargs or {}) + tag = await create_tag_model( + async_session=self.async_session, + tag_kwargs=payload, + ) + self.tags[key] = tag + self.by_id[str(tag.id)] = tag + return tag + + async def create_tag_value( + self, + key: str, + tag: Tag, + tag_value_kwargs: dict | None = None, + ) -> TagValue: + if key in self.tag_values: + return self.tag_values[key] + + payload = dict(tag_value_kwargs or {}) + tag_value = await create_tag_value_model( + async_session=self.async_session, + tag_id=tag.id, + tag_value_kwargs=payload, + ) + self.tag_values[key] = tag_value + self.by_id[str(tag_value.id)] = tag_value + return tag_value + + async def create_dataset( + self, + key: str, + location: Location, + dataset_kwargs: dict | None = None, + tag_values: set[TagValue] | None = None, + ) -> Dataset: + if key in self.datasets: + return self.datasets[key] + + payload = dict(dataset_kwargs or {}) + dataset = await create_dataset_model( + async_session=self.async_session, + location_id=location.id, + dataset_kwargs=payload, + tag_values=tag_values, + ) + self.datasets[key] = dataset + self.by_id[str(dataset.id)] = dataset + return dataset + + async def create_dataset_symlink( + self, + key: str, + from_dataset: Dataset, + to_dataset: Dataset, + type: DatasetSymlinkType, + ) -> DatasetSymlink: + if key in self.dataset_symlinks: + return self.dataset_symlinks[key] + + symlink = await make_dataset_symlink_model( + async_session=self.async_session, + from_dataset=from_dataset, + to_dataset=to_dataset, + type=type, + ) + self.dataset_symlinks[key] = symlink + self.by_id[str(symlink.id)] = symlink + return symlink + + async def create_job_type( + self, + key: str, + job_type_kwargs: dict | None = None, + ) -> JobType: + if key in self.job_types: + return self.job_types[key] + + payload = dict(job_type_kwargs or {}) + job_type = await create_job_type_model( + async_session=self.async_session, + job_type_kwargs=payload, + ) + self.job_types[key] = job_type + self.by_id[str(job_type.id)] = job_type + return job_type + + async def create_job( + self, + key: str, + location: Location, + job_type: JobType, + job_kwargs: dict | None = None, + tag_values: set[TagValue] | None = None, + ) -> Job: + if key in self.jobs: + return self.jobs[key] + + payload = dict(job_kwargs or {}) + job = await create_job_model( + async_session=self.async_session, + location_id=location.id, + job_type_id=job_type.id, + job_kwargs=payload, + tag_values=tag_values, + ) + self.jobs[key] = job + self.by_id[str(job.id)] = job + return job + + async def create_job_dependency( + self, + key: str, + from_job: Job, + to_job: Job, + dependency_type: str = "DIRECT_DEPENDENCY", + ) -> JobDependency: + if key in self.job_dependencies: + return self.job_dependencies[key] + + dependency = JobDependency( + from_job_id=from_job.id, + to_job_id=to_job.id, + type=dependency_type, + ) + self.async_session.add(dependency) + await self.async_session.commit() + await self.async_session.refresh(dependency) + + self.job_dependencies[key] = dependency + # JobDependency doesn't expose a stable single-column id in this model. + self.by_id[key] = dependency + return dependency + + async def create_user( + self, + key: str, + user_kwargs: dict | None = None, + ) -> User: + if key in self.users: + return self.users[key] + + payload = dict(user_kwargs or {}) + user = await create_user_model( + async_session=self.async_session, + user_kwargs=payload, + ) + self.users[key] = user + self.by_id[str(user.id)] = user + return user + + async def create_run( + self, + key: str, + job: Job, + run_kwargs: dict | None = None, + ) -> Run: + if key in self.runs: + return self.runs[key] + + payload = dict(run_kwargs or {}) + payload["job_id"] = job.id + run = await create_run_model( + async_session=self.async_session, + run_kwargs=payload, + ) + self.runs[key] = run + self.by_id[str(run.id)] = run + return run + + async def create_sql_query( + self, + key: str, + sql_query_kwargs: dict | None = None, + ) -> SQLQuery: + if key in self.sql_queries: + return self.sql_queries[key] + + payload = dict(sql_query_kwargs or {}) + sql_query = await create_sql_query_model( + async_session=self.async_session, + sql_query_kwargs=payload, + ) + self.sql_queries[key] = sql_query + self.by_id[str(sql_query.id)] = sql_query + return sql_query + + async def create_operation( + self, + key: str, + run: Run, + operation_kwargs: dict | None = None, + sql_query: SQLQuery | None = None, + ) -> Operation: + if key in self.operations: + return self.operations[key] + + payload = dict(operation_kwargs or {}) + payload["run_id"] = run.id + if sql_query is not None: + payload["sql_query_id"] = sql_query.id + + operation = await create_operation_model( + async_session=self.async_session, + operation_kwargs=payload, + ) + self.operations[key] = operation + self.by_id[str(operation.id)] = operation + return operation + + async def create_input( + self, + key: str, + operation: Operation, + run: Run, + job: Job, + dataset: Dataset, + schema: Schema | None = None, + input_kwargs: dict | None = None, + ) -> Input: + if key in self.inputs: + return self.inputs[key] + + payload = dict(input_kwargs or {}) + payload.update( + { + "operation_id": operation.id, + "run_id": run.id, + "job_id": job.id, + "dataset_id": dataset.id, + "schema_id": schema.id if schema is not None else None, + }, + ) + input_row = await create_input_model( + async_session=self.async_session, + input_kwargs=payload, + ) + self.inputs[key] = input_row + self.by_id[str(input_row.id)] = input_row + return input_row + + async def create_output( + self, + key: str, + operation: Operation, + run: Run, + job: Job, + dataset: Dataset, + output_type: OutputType | None = None, + schema: Schema | None = None, + output_kwargs: dict | None = None, + ) -> Output: + if key in self.outputs: + return self.outputs[key] + + payload = dict(output_kwargs or {}) + payload.update( + { + "operation_id": operation.id, + "run_id": run.id, + "job_id": job.id, + "dataset_id": dataset.id, + "schema_id": schema.id if schema is not None else None, + }, + ) + if output_type is not None: + payload["type"] = output_type + + output = await create_output_model( + async_session=self.async_session, + output_kwargs=payload, + ) + self.outputs[key] = output + self.by_id[str(output.id)] = output + return output + + async def create_column_lineage( + self, + key: str, + operation: Operation, + run: Run, + job: Job, + source_dataset: Dataset, + target_dataset: Dataset, + fingerprint: UUID | None = None, + column_lineage_kwargs: dict | None = None, + ) -> ColumnLineage: + if key in self.column_lineage: + return self.column_lineage[key] + + payload = dict(column_lineage_kwargs or {}) + payload.update( + { + "operation_id": operation.id, + "run_id": run.id, + "job_id": job.id, + "source_dataset_id": source_dataset.id, + "target_dataset_id": target_dataset.id, + }, + ) + if fingerprint is not None: + payload["fingerprint"] = fingerprint + + column_lineage = await create_column_lineage_model( + async_session=self.async_session, + column_lineage_kwargs=payload, + ) + self.column_lineage[key] = column_lineage + self.by_id[str(column_lineage.id)] = column_lineage + return column_lineage + + async def create_column_relation( + self, + key: str, + fingerprint: UUID, + column_relation_kwargs: dict | None = None, + ) -> DatasetColumnRelation: + if key in self.column_relations: + return self.column_relations[key] + + payload = dict(column_relation_kwargs or {}) + column_relation = await create_column_relation_model( + async_session=self.async_session, + fingerprint=fingerprint, + column_relation_kwargs=payload, + ) + self.column_relations[key] = column_relation + self.by_id[str(column_relation.id)] = column_relation + return column_relation + + async def create_personal_token( + self, + key: str, + user: User, + token_kwargs: dict | None = None, + ) -> PersonalToken: + if key in self.personal_tokens: + return self.personal_tokens[key] + + payload = dict(token_kwargs or {}) + token = await create_personal_token_model( + async_session=self.async_session, + user=user, + token_kwargs=payload, + ) + self.personal_tokens[key] = token + self.by_id[str(token.id)] = token + return token + + def register_direct_column_relation( + self, + column_lineage: ColumnLineage, + source_relations: list[DatasetColumnRelation], + target_relations: list[DatasetColumnRelation], + ) -> None: + self.lineage_result.direct_column_relations[column_lineage.fingerprint] = { + "source": source_relations, + "target": target_relations, + } + self.lineage_result.direct_column_lineage.append(column_lineage) + + def register_indirect_column_relation( + self, + column_lineage: ColumnLineage, + relations: list[DatasetColumnRelation], + ) -> None: + self.lineage_result.indirect_column_relations[column_lineage.fingerprint] = relations + self.lineage_result.indirect_column_lineage.append(column_lineage) + + def build(self) -> LineageResult: + self.lineage_result.jobs = list(self.jobs.values()) + self.lineage_result.runs = list(self.runs.values()) + self.lineage_result.operations = list(self.operations.values()) + self.lineage_result.datasets = list(self.datasets.values()) + self.lineage_result.inputs = list(self.inputs.values()) + self.lineage_result.outputs = list(self.outputs.values()) + self.lineage_result.dataset_symlinks = list(self.dataset_symlinks.values()) + if not self.lineage_result.direct_column_lineage: + self.lineage_result.direct_column_lineage = list(self.column_lineage.values()) + if not self.lineage_result.indirect_column_lineage: + self.lineage_result.indirect_column_lineage = list(self.column_lineage.values()) + return self.lineage_result