From d74a1b884d068fd1751944b93d3a32b18a0a45b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Fri, 3 Apr 2026 16:37:40 +0300 Subject: [PATCH 1/8] [DOP-34706] Include connections via symlink in hierarchy --- .../db/repositories/job_dependency.py | 22 +++- data_rentgen/server/settings/cors.py | 4 +- tests/test_server/fixtures/factories/job.py | 116 +++++++++++++++++- .../test_jobs/test_job_hierarchy.py | 43 +++++++ 4 files changed, 181 insertions(+), 4 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 5c8f6c17..faec6c9c 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -21,6 +21,7 @@ tuple_, ) +from data_rentgen.db.models.dataset_symlink import DatasetSymlink from data_rentgen.db.models.input import Input from data_rentgen.db.models.job_dependency import JobDependency from data_rentgen.db.models.output import Output @@ -133,6 +134,22 @@ def _get_core_hierarchy_query( JobDependency.type, ) if include_indirect: + datasets_connected_via_symlink = ( + select(literal(1)) + .where( + or_( + and_( + DatasetSymlink.from_dataset_id == Output.dataset_id, + DatasetSymlink.to_dataset_id == Input.dataset_id, + ), + and_( + DatasetSymlink.to_dataset_id == Output.dataset_id, + DatasetSymlink.from_dataset_id == Input.dataset_id, + ), + ), + ) + .exists() + ) query = query.union( select( Output.job_id.label("from_job_id"), @@ -142,7 +159,10 @@ def _get_core_hierarchy_query( .distinct() .join( Input, - Output.dataset_id == Input.dataset_id, + or_( + Output.dataset_id == Input.dataset_id, + datasets_connected_via_symlink, + ), ) .where( Input.created_at >= bindparam("since"), diff --git a/data_rentgen/server/settings/cors.py b/data_rentgen/server/settings/cors.py index 347fd54a..4763ad10 100644 --- a/data_rentgen/server/settings/cors.py +++ b/data_rentgen/server/settings/cors.py @@ -50,10 +50,10 @@ class CORSSettings(BaseModel): default=False, description="If ``True``, cookies should be supported for cross-origin request", ) - allow_methods: list[str] = Field(default=["GET"], description="HTTP Methods allowed for CORS") + allow_methods: list[str] = Field(default=["GET", "POST"], description="HTTP Methods allowed for CORS") # https://github.com/snok/asgi-correlation-id#cors allow_headers: list[str] = Field( - default=["X-Request-ID", "X-Request-With"], + default=["X-Request-ID", "X-Request-With", "Access-Control-Allow-Origin"], description="HTTP headers allowed for CORS", ) expose_headers: list[str] = Field(default=["X-Request-ID"], description="HTTP headers exposed from backend") diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index 482a28c0..fbad938e 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -7,9 +7,10 @@ import pytest_asyncio from data_rentgen.db.models import Job, JobDependency, TagValue +from data_rentgen.db.models.dataset_symlink import DatasetSymlinkType from data_rentgen.utils.uuid import generate_new_uuid from tests.test_server.fixtures.factories.base import random_string -from tests.test_server.fixtures.factories.dataset import create_dataset +from tests.test_server.fixtures.factories.dataset import create_dataset, make_symlink from tests.test_server.fixtures.factories.input import create_input from tests.test_server.fixtures.factories.job_type import create_job_type from tests.test_server.fixtures.factories.location import create_location @@ -597,3 +598,116 @@ async def job_dependency_chain_with_lineage( async with async_session_maker() as async_session: await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain_with_lineage_and_symlinks( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], +) -> AsyncGenerator[tuple[Job, ...], None]: + """ " + Jobs are connected via IO relations with symlinks: + - left_spark --Out--Symlink--In->spark (inferred via input/output relation) + - spark--Out--Symlink--In->right_spark (inferred via input/output relation) + """ + + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + left_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "left_spark"}, + ) + spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "spark"}, + ) + right_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "right_spark"}, + ) + + # Create datasets connected via symlinks. + left_dataset_location = await create_location(async_session) + left_output_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + left_input_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + await make_symlink( + async_session=async_session, + from_dataset=left_output_dataset, + to_dataset=left_input_dataset, + type=DatasetSymlinkType.METASTORE, + ) + + right_dataset_location = await create_location(async_session) + right_output_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + right_input_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + await make_symlink( + async_session=async_session, + from_dataset=right_input_dataset, + to_dataset=right_output_dataset, + type=DatasetSymlinkType.WAREHOUSE, + ) + + # Connect left chain to central chain: left_spark -> spark + left_output_created_at = datetime.now(tz=UTC) + left_input_created_at = left_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": left_output_created_at, + "operation_id": generate_new_uuid(left_output_created_at), + "run_id": generate_new_uuid(left_output_created_at), + "job_id": left_spark.id, + "dataset_id": left_output_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": left_input_created_at, + "operation_id": generate_new_uuid(left_input_created_at), + "run_id": generate_new_uuid(left_input_created_at), + "job_id": spark.id, + "dataset_id": left_input_dataset.id, + "schema_id": None, + }, + ) + + # Connect central chain to right chain: spark3 -> right_spark + right_output_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) + right_input_created_at = right_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": right_output_created_at, + "operation_id": generate_new_uuid(right_output_created_at), + "run_id": generate_new_uuid(right_output_created_at), + "job_id": spark.id, + "dataset_id": right_output_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": right_input_created_at, + "operation_id": generate_new_uuid(right_input_created_at), + "run_id": generate_new_uuid(right_input_created_at), + "job_id": right_spark.id, + "dataset_id": right_input_dataset.id, + "schema_id": None, + }, + ) + + async_session.expunge_all() + + yield (left_spark, spark, right_spark) + + async with async_session_maker() as async_session: + await clean_db(async_session) diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index eff2899d..c3b2bce2 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -454,6 +454,49 @@ async def test_get_job_hierarchy_with_inferred_dependencies_with_since_and_until } +async def test_get_job_hierarchy_with_inferred_dependencies_with_symlinks( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_lineage_and_symlinks: tuple[Job, ...], + mocked_user: MockedUser, +): + left_spark, spark, right_spark = job_dependency_chain_with_lineage_and_symlinks + start_node = spark + + expected_nodes = await enrich_jobs([left_spark, spark, right_spark], async_session) + expected_deps = [ + (left_spark.id, spark.id, "INFERRED_FROM_LINEAGE"), + (spark.id, right_spark.id, "INFERRED_FROM_LINEAGE"), + ] + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + "since": datetime.min.replace(tzinfo=UTC).isoformat(), + }, + ) + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": jobs_ancestors_to_json(expected_nodes), + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(from_id)}, + "to": {"kind": "JOB", "id": str(to_id)}, + "type": dep_type, + } + for from_id, to_id, dep_type in expected_deps + ], + }, + "nodes": {"jobs": jobs_to_json(expected_nodes)}, + } + + async def test_get_job_hierarchy_with_inferred_dependencies_without_since( test_client: AsyncClient, job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...], From 701667b2517f5b9f8188674e8a6d3b5557eda2c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Fri, 3 Apr 2026 17:44:23 +0300 Subject: [PATCH 2/8] [DOP-34706] Moving fixtures and tests into separate files --- tests/conftest.py | 1 + tests/test_server/fixtures/factories/job.py | 388 +---------------- .../fixtures/factories/job_dependencies.py | 403 ++++++++++++++++++ tests/test_server/test_hierarchy/__init__.py | 0 .../test_job_hierarchy.py | 0 5 files changed, 405 insertions(+), 387 deletions(-) create mode 100644 tests/test_server/fixtures/factories/job_dependencies.py create mode 100644 tests/test_server/test_hierarchy/__init__.py rename tests/test_server/{test_jobs => test_hierarchy}/test_job_hierarchy.py (100%) diff --git a/tests/conftest.py b/tests/conftest.py index f92dbab4..2ad6d3c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,6 +29,7 @@ "tests.test_server.fixtures.keycloak", "tests.test_server.fixtures.factories.address", "tests.test_server.fixtures.factories.dataset", + "tests.test_server.fixtures.factories.job_dependencies", "tests.test_server.fixtures.factories.job_type", "tests.test_server.fixtures.factories.job", "tests.test_server.fixtures.factories.lineage", diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index fbad938e..08a61baf 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -1,20 +1,14 @@ from __future__ import annotations -from datetime import UTC, datetime, timedelta from random import randint from typing import TYPE_CHECKING import pytest_asyncio -from data_rentgen.db.models import Job, JobDependency, TagValue -from data_rentgen.db.models.dataset_symlink import DatasetSymlinkType -from data_rentgen.utils.uuid import generate_new_uuid +from data_rentgen.db.models import Job, TagValue from tests.test_server.fixtures.factories.base import random_string -from tests.test_server.fixtures.factories.dataset import create_dataset, make_symlink -from tests.test_server.fixtures.factories.input import create_input from tests.test_server.fixtures.factories.job_type import create_job_type from tests.test_server.fixtures.factories.location import create_location -from tests.test_server.fixtures.factories.output import create_output from tests.test_server.utils.delete import clean_db if TYPE_CHECKING: @@ -331,383 +325,3 @@ async def jobs_with_same_parent_job( async with async_session_maker() as async_session: await clean_db(async_session) - - -@pytest_asyncio.fixture -async def job_dependency_depth_chain( - async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], -) -> AsyncGenerator[list[Job], None]: - """ - Linear dependency chain of 5 jobs: - - job_1 → job_2 → job_3 → job_4 → job_5 - - Each arrow is a JobDependency edge with type "DIRECT_DEPENDENCY". - Used for testing depth-limited dependency queries. - """ - async with async_session_maker() as async_session: - location = await create_location(async_session) - job_type = await create_job_type(async_session) - - jobs = [] - for i in range(1, 6): - job = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type.id, - job_kwargs={"name": f"depth-chain-job-{i}"}, - ) - jobs.append(job) - - async_session.add_all( - [ - JobDependency( - from_job_id=jobs[i].id, - to_job_id=jobs[i + 1].id, - type="DIRECT_DEPENDENCY", - ) - for i in range(len(jobs) - 1) - ], - ) - await async_session.commit() - async_session.expunge_all() - - yield jobs - - async with async_session_maker() as async_session: - await clean_db(async_session) - - -@pytest_asyncio.fixture -async def job_dependency_chain( - async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], -) -> AsyncGenerator[tuple[tuple[Job, Job, Job], ...], None]: - """ - Fixture that creates: - - Parent-child hierarchy: dag -> task -> spark via parent_job_id - - Job dependency edges: task1 -> task2 and task2 -> task3 - - There are no relations like Dag -> Dag and Spark -> Spark. - """ - async with async_session_maker() as async_session: - location = await create_location(async_session) - - job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) - dag1 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_dag.id, - job_kwargs={"name": "dag1"}, - ) - dag2 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_dag.id, - job_kwargs={"name": "dag2"}, - ) - dag3 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_dag.id, - job_kwargs={"name": "dag3"}, - ) - - job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) - task1 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_task.id, - job_kwargs={"name": "task1", "parent_job_id": dag1.id}, - ) - task2 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_task.id, - job_kwargs={"name": "task2", "parent_job_id": dag2.id}, - ) - task3 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_task.id, - job_kwargs={"name": "task3", "parent_job_id": dag3.id}, - ) - - job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) - spark1 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "spark1", "parent_job_id": task1.id}, - ) - spark2 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "spark2", "parent_job_id": task2.id}, - ) - spark3 = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "spark3", "parent_job_id": task3.id}, - ) - - async_session.add_all( - [ - JobDependency(from_job_id=task1.id, to_job_id=task2.id, type="DIRECT_DEPENDENCY"), - JobDependency(from_job_id=task2.id, to_job_id=task3.id, type="DIRECT_DEPENDENCY"), - ], - ) - await async_session.commit() - async_session.expunge_all() - - yield ( - (dag1, dag2, dag3), - (task1, task2, task3), - (spark1, spark2, spark3), - ) - - async with async_session_maker() as async_session: - await clean_db(async_session) - - -@pytest_asyncio.fixture -async def job_dependency_chain_with_lineage( - async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], - job_dependency_chain: tuple[tuple[Job, Job, Job], ...], -) -> AsyncGenerator[tuple[tuple[Job, Job, Job, Job, Job], ...], None]: - """ - Extends `job_dependency_chain` with two extra parent-child chains: - - left: left_dag -> left_task -> left_spark - - right: right_dag -> right_task -> right_spark - - The chains are connected to the central fixture on task level via IO relations: - - left_task -> task1 (inferred via input/output relation) - - task3 -> right_task (inferred via input/output relation) - """ - (dag1, dag2, dag3), (task1, task2, task3), (spark1, spark2, spark3) = job_dependency_chain - - async with async_session_maker() as async_session: - location = await create_location(async_session) - job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) - job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) - job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) - - left_dag = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_dag.id, - job_kwargs={"name": "left_dag"}, - ) - left_task = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_task.id, - job_kwargs={"name": "left_task", "parent_job_id": left_dag.id}, - ) - left_spark = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "left_spark", "parent_job_id": left_task.id}, - ) - - right_dag = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_dag.id, - job_kwargs={"name": "right_dag"}, - ) - right_task = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_task.id, - job_kwargs={"name": "right_task", "parent_job_id": right_dag.id}, - ) - right_spark = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "right_spark", "parent_job_id": right_task.id}, - ) - - left_dataset_location = await create_location(async_session) - left_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) - right_dataset_location = await create_location(async_session) - right_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) - - # Connect left chain to central chain: left_spark -> spark1 - left_output_created_at = datetime.now(tz=UTC) - left_input_created_at = left_output_created_at - timedelta(seconds=1) - await create_output( - async_session, - output_kwargs={ - "created_at": left_output_created_at, - "operation_id": generate_new_uuid(left_output_created_at), - "run_id": generate_new_uuid(left_output_created_at), - "job_id": left_spark.id, - "dataset_id": left_dataset.id, - "schema_id": None, - }, - ) - await create_input( - async_session, - input_kwargs={ - "created_at": left_input_created_at, - "operation_id": generate_new_uuid(left_input_created_at), - "run_id": generate_new_uuid(left_input_created_at), - "job_id": spark1.id, - "dataset_id": left_dataset.id, - "schema_id": None, - }, - ) - - # Connect central chain to right chain: spark3 -> right_spark - right_output_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) - right_input_created_at = right_output_created_at - timedelta(seconds=1) - await create_output( - async_session, - output_kwargs={ - "created_at": right_output_created_at, - "operation_id": generate_new_uuid(right_output_created_at), - "run_id": generate_new_uuid(right_output_created_at), - "job_id": spark3.id, - "dataset_id": right_dataset.id, - "schema_id": None, - }, - ) - await create_input( - async_session, - input_kwargs={ - "created_at": right_input_created_at, - "operation_id": generate_new_uuid(right_input_created_at), - "run_id": generate_new_uuid(right_input_created_at), - "job_id": right_spark.id, - "dataset_id": right_dataset.id, - "schema_id": None, - }, - ) - - async_session.expunge_all() - - yield ( - (left_dag, dag1, dag2, dag3, right_dag), - (left_task, task1, task2, task3, right_task), - (left_spark, spark1, spark2, spark3, right_spark), - ) - - async with async_session_maker() as async_session: - await clean_db(async_session) - - -@pytest_asyncio.fixture -async def job_dependency_chain_with_lineage_and_symlinks( - async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], -) -> AsyncGenerator[tuple[Job, ...], None]: - """ " - Jobs are connected via IO relations with symlinks: - - left_spark --Out--Symlink--In->spark (inferred via input/output relation) - - spark--Out--Symlink--In->right_spark (inferred via input/output relation) - """ - - async with async_session_maker() as async_session: - location = await create_location(async_session) - job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) - left_spark = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "left_spark"}, - ) - spark = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "spark"}, - ) - right_spark = await create_job( - async_session, - location_id=location.id, - job_type_id=job_type_spark.id, - job_kwargs={"name": "right_spark"}, - ) - - # Create datasets connected via symlinks. - left_dataset_location = await create_location(async_session) - left_output_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) - left_input_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) - await make_symlink( - async_session=async_session, - from_dataset=left_output_dataset, - to_dataset=left_input_dataset, - type=DatasetSymlinkType.METASTORE, - ) - - right_dataset_location = await create_location(async_session) - right_output_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) - right_input_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) - await make_symlink( - async_session=async_session, - from_dataset=right_input_dataset, - to_dataset=right_output_dataset, - type=DatasetSymlinkType.WAREHOUSE, - ) - - # Connect left chain to central chain: left_spark -> spark - left_output_created_at = datetime.now(tz=UTC) - left_input_created_at = left_output_created_at - timedelta(seconds=1) - await create_output( - async_session, - output_kwargs={ - "created_at": left_output_created_at, - "operation_id": generate_new_uuid(left_output_created_at), - "run_id": generate_new_uuid(left_output_created_at), - "job_id": left_spark.id, - "dataset_id": left_output_dataset.id, - "schema_id": None, - }, - ) - await create_input( - async_session, - input_kwargs={ - "created_at": left_input_created_at, - "operation_id": generate_new_uuid(left_input_created_at), - "run_id": generate_new_uuid(left_input_created_at), - "job_id": spark.id, - "dataset_id": left_input_dataset.id, - "schema_id": None, - }, - ) - - # Connect central chain to right chain: spark3 -> right_spark - right_output_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) - right_input_created_at = right_output_created_at - timedelta(seconds=1) - await create_output( - async_session, - output_kwargs={ - "created_at": right_output_created_at, - "operation_id": generate_new_uuid(right_output_created_at), - "run_id": generate_new_uuid(right_output_created_at), - "job_id": spark.id, - "dataset_id": right_output_dataset.id, - "schema_id": None, - }, - ) - await create_input( - async_session, - input_kwargs={ - "created_at": right_input_created_at, - "operation_id": generate_new_uuid(right_input_created_at), - "run_id": generate_new_uuid(right_input_created_at), - "job_id": right_spark.id, - "dataset_id": right_input_dataset.id, - "schema_id": None, - }, - ) - - async_session.expunge_all() - - yield (left_spark, spark, right_spark) - - async with async_session_maker() as async_session: - await clean_db(async_session) diff --git a/tests/test_server/fixtures/factories/job_dependencies.py b/tests/test_server/fixtures/factories/job_dependencies.py new file mode 100644 index 00000000..8117ecdc --- /dev/null +++ b/tests/test_server/fixtures/factories/job_dependencies.py @@ -0,0 +1,403 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING + +import pytest_asyncio + +from data_rentgen.db.models import Job, JobDependency +from data_rentgen.db.models.dataset_symlink import DatasetSymlinkType +from data_rentgen.utils.uuid import generate_new_uuid +from tests.test_server.fixtures.factories.dataset import create_dataset, make_symlink +from tests.test_server.fixtures.factories.input import create_input +from tests.test_server.fixtures.factories.job import create_job +from tests.test_server.fixtures.factories.job_type import create_job_type +from tests.test_server.fixtures.factories.location import create_location +from tests.test_server.fixtures.factories.output import create_output +from tests.test_server.utils.delete import clean_db + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Callable + from contextlib import AbstractAsyncContextManager + + from sqlalchemy.ext.asyncio import AsyncSession + + +@pytest_asyncio.fixture +async def job_dependency_depth_chain( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], +) -> AsyncGenerator[list[Job], None]: + """ + Linear dependency chain of 5 jobs: + + job_1 → job_2 → job_3 → job_4 → job_5 + + Each arrow is a JobDependency edge with type "DIRECT_DEPENDENCY". + Used for testing depth-limited dependency queries. + """ + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type = await create_job_type(async_session) + + jobs = [] + for i in range(1, 6): + job = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type.id, + job_kwargs={"name": f"depth-chain-job-{i}"}, + ) + jobs.append(job) + + async_session.add_all( + [ + JobDependency( + from_job_id=jobs[i].id, + to_job_id=jobs[i + 1].id, + type="DIRECT_DEPENDENCY", + ) + for i in range(len(jobs) - 1) + ], + ) + await async_session.commit() + async_session.expunge_all() + + yield jobs + + async with async_session_maker() as async_session: + await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], +) -> AsyncGenerator[tuple[tuple[Job, Job, Job], ...], None]: + """ + Fixture that creates: + - Parent-child hierarchy: dag -> task -> spark via parent_job_id + - Job dependency edges: task1 -> task2 and task2 -> task3 + + There are no relations like Dag -> Dag and Spark -> Spark. + """ + async with async_session_maker() as async_session: + location = await create_location(async_session) + + job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) + dag1 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "dag1"}, + ) + dag2 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "dag2"}, + ) + dag3 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "dag3"}, + ) + + job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) + task1 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "task1", "parent_job_id": dag1.id}, + ) + task2 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "task2", "parent_job_id": dag2.id}, + ) + task3 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "task3", "parent_job_id": dag3.id}, + ) + + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + spark1 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "spark1", "parent_job_id": task1.id}, + ) + spark2 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "spark2", "parent_job_id": task2.id}, + ) + spark3 = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "spark3", "parent_job_id": task3.id}, + ) + + async_session.add_all( + [ + JobDependency(from_job_id=task1.id, to_job_id=task2.id, type="DIRECT_DEPENDENCY"), + JobDependency(from_job_id=task2.id, to_job_id=task3.id, type="DIRECT_DEPENDENCY"), + ], + ) + await async_session.commit() + async_session.expunge_all() + + yield ( + (dag1, dag2, dag3), + (task1, task2, task3), + (spark1, spark2, spark3), + ) + + async with async_session_maker() as async_session: + await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain_with_lineage( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], + job_dependency_chain: tuple[tuple[Job, Job, Job], ...], +) -> AsyncGenerator[tuple[tuple[Job, Job, Job, Job, Job], ...], None]: + """ + Extends `job_dependency_chain` with two extra parent-child chains: + - left: left_dag -> left_task -> left_spark + - right: right_dag -> right_task -> right_spark + + The chains are connected to the central fixture on task level via IO relations: + - left_task -> task1 (inferred via input/output relation) + - task3 -> right_task (inferred via input/output relation) + """ + (dag1, dag2, dag3), (task1, task2, task3), (spark1, spark2, spark3) = job_dependency_chain + + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) + job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + + left_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "left_dag"}, + ) + left_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "left_task", "parent_job_id": left_dag.id}, + ) + left_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "left_spark", "parent_job_id": left_task.id}, + ) + + right_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "right_dag"}, + ) + right_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "right_task", "parent_job_id": right_dag.id}, + ) + right_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "right_spark", "parent_job_id": right_task.id}, + ) + + left_dataset_location = await create_location(async_session) + left_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + right_dataset_location = await create_location(async_session) + right_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + + # Connect left chain to central chain: left_spark -> spark1 + left_output_created_at = datetime.now(tz=UTC) + left_input_created_at = left_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": left_output_created_at, + "operation_id": generate_new_uuid(left_output_created_at), + "run_id": generate_new_uuid(left_output_created_at), + "job_id": left_spark.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": left_input_created_at, + "operation_id": generate_new_uuid(left_input_created_at), + "run_id": generate_new_uuid(left_input_created_at), + "job_id": spark1.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + + # Connect central chain to right chain: spark3 -> right_spark + right_output_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) + right_input_created_at = right_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": right_output_created_at, + "operation_id": generate_new_uuid(right_output_created_at), + "run_id": generate_new_uuid(right_output_created_at), + "job_id": spark3.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": right_input_created_at, + "operation_id": generate_new_uuid(right_input_created_at), + "run_id": generate_new_uuid(right_input_created_at), + "job_id": right_spark.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + + async_session.expunge_all() + + yield ( + (left_dag, dag1, dag2, dag3, right_dag), + (left_task, task1, task2, task3, right_task), + (left_spark, spark1, spark2, spark3, right_spark), + ) + + async with async_session_maker() as async_session: + await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain_with_lineage_and_symlinks( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], +) -> AsyncGenerator[tuple[Job, ...], None]: + """ " + Jobs are connected via IO relations with symlinks: + - left_spark --Out--Symlink--In->spark (inferred via input/output relation) + - spark--Out--Symlink--In->right_spark (inferred via input/output relation) + """ + + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + left_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "left_spark"}, + ) + spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "spark"}, + ) + right_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "right_spark"}, + ) + + # Create datasets connected via symlinks. + left_dataset_location = await create_location(async_session) + left_output_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + left_input_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + await make_symlink( + async_session=async_session, + from_dataset=left_output_dataset, + to_dataset=left_input_dataset, + type=DatasetSymlinkType.METASTORE, + ) + + right_dataset_location = await create_location(async_session) + right_output_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + right_input_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + await make_symlink( + async_session=async_session, + from_dataset=right_input_dataset, + to_dataset=right_output_dataset, + type=DatasetSymlinkType.WAREHOUSE, + ) + + # Connect left chain to central chain: left_spark -> spark + left_output_created_at = datetime.now(tz=UTC) + left_input_created_at = left_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": left_output_created_at, + "operation_id": generate_new_uuid(left_output_created_at), + "run_id": generate_new_uuid(left_output_created_at), + "job_id": left_spark.id, + "dataset_id": left_output_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": left_input_created_at, + "operation_id": generate_new_uuid(left_input_created_at), + "run_id": generate_new_uuid(left_input_created_at), + "job_id": spark.id, + "dataset_id": left_input_dataset.id, + "schema_id": None, + }, + ) + + # Connect central chain to right chain: spark3 -> right_spark + right_output_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) + right_input_created_at = right_output_created_at - timedelta(seconds=1) + await create_output( + async_session, + output_kwargs={ + "created_at": right_output_created_at, + "operation_id": generate_new_uuid(right_output_created_at), + "run_id": generate_new_uuid(right_output_created_at), + "job_id": spark.id, + "dataset_id": right_output_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": right_input_created_at, + "operation_id": generate_new_uuid(right_input_created_at), + "run_id": generate_new_uuid(right_input_created_at), + "job_id": right_spark.id, + "dataset_id": right_input_dataset.id, + "schema_id": None, + }, + ) + + async_session.expunge_all() + + yield (left_spark, spark, right_spark) + + async with async_session_maker() as async_session: + await clean_db(async_session) diff --git a/tests/test_server/test_hierarchy/__init__.py b/tests/test_server/test_hierarchy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_hierarchy/test_job_hierarchy.py similarity index 100% rename from tests/test_server/test_jobs/test_job_hierarchy.py rename to tests/test_server/test_hierarchy/test_job_hierarchy.py From 63c899c36cbec523fe74ad4ebb2e32bf0c130cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 11:07:54 +0300 Subject: [PATCH 3/8] [DOP-34706] Separate join in two unions --- .../db/repositories/job_dependency.py | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index faec6c9c..991f9682 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -150,7 +150,20 @@ def _get_core_hierarchy_query( ) .exists() ) - query = query.union( + where_clauses = [ + Input.created_at >= bindparam("since"), + Output.created_at >= bindparam("since"), + Output.created_at >= Input.created_at, + Output.job_id != Input.job_id, + or_( + bindparam("until", type_=DateTime(timezone=True)).is_(None), + and_( + Input.created_at <= bindparam("until"), + Output.created_at <= bindparam("until"), + ), + ), + ] + direct_connection = ( select( Output.job_id.label("from_job_id"), Input.job_id.label("to_job_id"), @@ -159,23 +172,24 @@ def _get_core_hierarchy_query( .distinct() .join( Input, - or_( - Output.dataset_id == Input.dataset_id, - datasets_connected_via_symlink, - ), + Output.dataset_id == Input.dataset_id, ) - .where( - Input.created_at >= bindparam("since"), - Output.created_at >= bindparam("since"), - Output.created_at >= Input.created_at, - Output.job_id != Input.job_id, - or_( - bindparam("until", type_=DateTime(timezone=True)).is_(None), - and_( - Input.created_at <= bindparam("until"), - Output.created_at <= bindparam("until"), - ), - ), + .where(*where_clauses) + ) + via_symlinks = ( + select( + Output.job_id.label("from_job_id"), + Input.job_id.label("to_job_id"), + literal("INFERRED_FROM_LINEAGE").label("type"), ) + .distinct() + .join( + Input, + datasets_connected_via_symlink, + ) + .where(*where_clauses) ) + + query = query.union(direct_connection, via_symlinks) + return query.cte("jobs_hierarchy_core_query").prefix_with("NOT MATERIALIZED", dialect="postgresql") From 16dfbd733a53c4317a0a5b44cb79732c3d897825 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 13:00:44 +0300 Subject: [PATCH 4/8] [DOP-34706] Separate IO queries --- .../db/repositories/job_dependency.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 991f9682..56061ce5 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -134,22 +134,6 @@ def _get_core_hierarchy_query( JobDependency.type, ) if include_indirect: - datasets_connected_via_symlink = ( - select(literal(1)) - .where( - or_( - and_( - DatasetSymlink.from_dataset_id == Output.dataset_id, - DatasetSymlink.to_dataset_id == Input.dataset_id, - ), - and_( - DatasetSymlink.to_dataset_id == Output.dataset_id, - DatasetSymlink.from_dataset_id == Input.dataset_id, - ), - ), - ) - .exists() - ) where_clauses = [ Input.created_at >= bindparam("since"), Output.created_at >= bindparam("since"), @@ -163,6 +147,7 @@ def _get_core_hierarchy_query( ), ), ] + # IO connections via same dataset direct_connection = ( select( Output.job_id.label("from_job_id"), @@ -176,20 +161,37 @@ def _get_core_hierarchy_query( ) .where(*where_clauses) ) - via_symlinks = ( + # IO connections Output.d_id == Symlink.to_d_id Symlink.from_d_id == Input.d_id + via_symlinks_from_output = ( select( Output.job_id.label("from_job_id"), Input.job_id.label("to_job_id"), literal("INFERRED_FROM_LINEAGE").label("type"), ) .distinct() + .join(DatasetSymlink, Output.dataset_id == DatasetSymlink.to_dataset_id) .join( Input, - datasets_connected_via_symlink, + DatasetSymlink.from_dataset_id == Input.dataset_id, + ) + .where(*where_clauses) + ) + # IO connections Input.d_id == Symlink.to_d_id Symlink.from_d_id == Output.d_id + via_symlinks_from_input = ( + select( + Output.job_id.label("from_job_id"), + Input.job_id.label("to_job_id"), + literal("INFERRED_FROM_LINEAGE").label("type"), + ) + .distinct() + .join(DatasetSymlink, Input.dataset_id == DatasetSymlink.to_dataset_id) + .join( + Output, + DatasetSymlink.from_dataset_id == Output.dataset_id, ) .where(*where_clauses) ) - query = query.union(direct_connection, via_symlinks) + query = query.union(direct_connection, via_symlinks_from_input, via_symlinks_from_output) return query.cte("jobs_hierarchy_core_query").prefix_with("NOT MATERIALIZED", dialect="postgresql") From 2c02164cc694d26cfd40438107b79949a63438e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 13:35:56 +0300 Subject: [PATCH 5/8] [DOP-34706] move select to variable --- .../db/repositories/job_dependency.py | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 56061ce5..b2a4bce3 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -134,6 +134,7 @@ def _get_core_hierarchy_query( JobDependency.type, ) if include_indirect: + # Where clause and columns are common part for all unions where_clauses = [ Input.created_at >= bindparam("since"), Output.created_at >= bindparam("since"), @@ -147,29 +148,20 @@ def _get_core_hierarchy_query( ), ), ] + inferred_columns = select( + Output.job_id.label("from_job_id"), + Input.job_id.label("to_job_id"), + literal("INFERRED_FROM_LINEAGE").label("type"), + ).distinct() + # IO connections via same dataset - direct_connection = ( - select( - Output.job_id.label("from_job_id"), - Input.job_id.label("to_job_id"), - literal("INFERRED_FROM_LINEAGE").label("type"), - ) - .distinct() - .join( - Input, - Output.dataset_id == Input.dataset_id, - ) - .where(*where_clauses) - ) + direct_connection = inferred_columns.join( + Input, + Output.dataset_id == Input.dataset_id, + ).where(*where_clauses) # IO connections Output.d_id == Symlink.to_d_id Symlink.from_d_id == Input.d_id via_symlinks_from_output = ( - select( - Output.job_id.label("from_job_id"), - Input.job_id.label("to_job_id"), - literal("INFERRED_FROM_LINEAGE").label("type"), - ) - .distinct() - .join(DatasetSymlink, Output.dataset_id == DatasetSymlink.to_dataset_id) + inferred_columns.join(DatasetSymlink, Output.dataset_id == DatasetSymlink.to_dataset_id) .join( Input, DatasetSymlink.from_dataset_id == Input.dataset_id, @@ -178,13 +170,7 @@ def _get_core_hierarchy_query( ) # IO connections Input.d_id == Symlink.to_d_id Symlink.from_d_id == Output.d_id via_symlinks_from_input = ( - select( - Output.job_id.label("from_job_id"), - Input.job_id.label("to_job_id"), - literal("INFERRED_FROM_LINEAGE").label("type"), - ) - .distinct() - .join(DatasetSymlink, Input.dataset_id == DatasetSymlink.to_dataset_id) + inferred_columns.join(DatasetSymlink, Input.dataset_id == DatasetSymlink.to_dataset_id) .join( Output, DatasetSymlink.from_dataset_id == Output.dataset_id, From 9f5d31019e8966eef14c3332d0f275ebce8bb195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 16:05:15 +0300 Subject: [PATCH 6/8] [DOP-34706] remove prefix from query --- data_rentgen/db/factory.py | 2 +- data_rentgen/db/repositories/job_dependency.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data_rentgen/db/factory.py b/data_rentgen/db/factory.py index 4878b04e..66daf278 100644 --- a/data_rentgen/db/factory.py +++ b/data_rentgen/db/factory.py @@ -12,7 +12,7 @@ def create_session_factory(settings: DatabaseSettings) -> async_sessionmaker[AsyncSession]: - engine = async_engine_from_config(settings.model_dump(), prefix="") + engine = async_engine_from_config(settings.model_dump(), prefix="", echo=True) return async_sessionmaker( bind=engine, class_=AsyncSession, diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index b2a4bce3..8e3c8b62 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -180,4 +180,4 @@ def _get_core_hierarchy_query( query = query.union(direct_connection, via_symlinks_from_input, via_symlinks_from_output) - return query.cte("jobs_hierarchy_core_query").prefix_with("NOT MATERIALIZED", dialect="postgresql") + return query.cte("jobs_hierarchy_core_query") From 61045ba9ca46a5f26bea87a5e26424dad6f79d99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 16:05:39 +0300 Subject: [PATCH 7/8] [DOP-34706] remove prefix from query --- data_rentgen/db/factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_rentgen/db/factory.py b/data_rentgen/db/factory.py index 66daf278..4878b04e 100644 --- a/data_rentgen/db/factory.py +++ b/data_rentgen/db/factory.py @@ -12,7 +12,7 @@ def create_session_factory(settings: DatabaseSettings) -> async_sessionmaker[AsyncSession]: - engine = async_engine_from_config(settings.model_dump(), prefix="", echo=True) + engine = async_engine_from_config(settings.model_dump(), prefix="") return async_sessionmaker( bind=engine, class_=AsyncSession, From 9fa5923c54fd0a80e666077e9f3ee5037800fea4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Mon, 6 Apr 2026 16:29:11 +0300 Subject: [PATCH 8/8] [DOP-34706] remove CTE for core part of the query --- data_rentgen/db/repositories/job_dependency.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 8e3c8b62..ef510a62 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -5,7 +5,6 @@ from sqlalchemy import ( ARRAY, - CTE, CompoundSelect, DateTime, Integer, @@ -105,13 +104,14 @@ async def get_dependencies( infer_from_lineage: bool = False, ) -> list[JobDependency]: core_query = self._get_core_hierarchy_query(include_indirect=infer_from_lineage) + core_subquery = core_query.subquery() - query: Select | CompoundSelect + query: Select match direction: case "UPSTREAM": - query = select(core_query).where(core_query.c.to_job_id == any_(bindparam("job_ids"))) + query = select(core_subquery).where(core_subquery.c.to_job_id == any_(bindparam("job_ids"))) case "DOWNSTREAM": - query = select(core_query).where(core_query.c.from_job_id == any_(bindparam("job_ids"))) + query = select(core_subquery).where(core_subquery.c.from_job_id == any_(bindparam("job_ids"))) result = await self._session.execute( query, @@ -126,7 +126,7 @@ def _get_core_hierarchy_query( self, *, include_indirect: bool = False, - ) -> CTE: + ) -> Select | CompoundSelect: query: Select | CompoundSelect query = select( JobDependency.from_job_id, @@ -180,4 +180,4 @@ def _get_core_hierarchy_query( query = query.union(direct_connection, via_symlinks_from_input, via_symlinks_from_output) - return query.cte("jobs_hierarchy_core_query") + return query